You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ch...@apache.org on 2006/02/22 00:40:29 UTC

svn commit: r379627 [22/34] - in /incubator/servicemix/trunk: ./ etc/ sandbox/servicemix-wsn-1.2/src/sa/META-INF/ sandbox/servicemix-wsn-1.2/src/su/META-INF/ servicemix-assembly/ servicemix-assembly/src/main/assembly/ servicemix-assembly/src/main/relea...

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractNotificationBroker.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractNotificationBroker.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractNotificationBroker.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractNotificationBroker.java Tue Feb 21 15:40:05 2006
@@ -1,276 +1,276 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.jws.Oneway;
-import javax.jws.WebMethod;
-import javax.jws.WebParam;
-import javax.jws.WebResult;
-import javax.jws.WebService;
-
-import org.apache.activemq.util.IdGenerator;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
-import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
-import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
-import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
-import org.apache.servicemix.wsn.jaxws.MultipleTopicsSpecifiedFault;
-import org.apache.servicemix.wsn.jaxws.NoCurrentMessageOnTopicFault;
-import org.apache.servicemix.wsn.jaxws.NotificationBroker;
-import org.apache.servicemix.wsn.jaxws.PublisherRegistrationFailedFault;
-import org.apache.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault;
-import org.apache.servicemix.wsn.jaxws.ResourceNotDestroyedFault;
-import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
-import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
-import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
-import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
-import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault;
-import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
-import org.oasis_open.docs.wsn.b_2.GetCurrentMessage;
-import org.oasis_open.docs.wsn.b_2.GetCurrentMessageResponse;
-import org.oasis_open.docs.wsn.b_2.NoCurrentMessageOnTopicFaultType;
-import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
-import org.oasis_open.docs.wsn.b_2.Notify;
-import org.oasis_open.docs.wsn.b_2.Subscribe;
-import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
-import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
-import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType;
-import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
-import org.oasis_open.docs.wsn.br_2.RegisterPublisherResponse;
-import org.w3._2005._08.addressing.EndpointReferenceType;
-
-@WebService(endpointInterface = "org.apache.servicemix.wsn.jaxws.NotificationBroker")
-public abstract class AbstractNotificationBroker extends AbstractEndpoint implements NotificationBroker {
-
-	private static Log log = LogFactory.getLog(AbstractNotificationBroker.class);
-	
-    private IdGenerator idGenerator;
-    private AbstractPublisher anonymousPublisher;
-    private Map<String,AbstractPublisher> publishers;
-    private Map<String,AbstractSubscription> subscriptions;
-
-	public AbstractNotificationBroker(String name) {
-		super(name);
-        idGenerator = new IdGenerator();
-        subscriptions = new ConcurrentHashMap<String,AbstractSubscription>();
-        publishers = new ConcurrentHashMap<String, AbstractPublisher>();
-	}
-
-    public void init() throws Exception {
-        register();
-        anonymousPublisher = createPublisher("Anonymous");
-        anonymousPublisher.register();
-    }
-    
-	protected String createAddress() {
-		return "http://servicemix.org/wsnotification/NotificationBroker/" + getName();
-	}
-	
-    /**
-     * 
-     * @param notify
-     */
-    @WebMethod(operationName = "Notify")
-    @Oneway
-    public void notify(
-        @WebParam(name = "Notify", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "Notify")
-        Notify notify) {
-    	
-    	log.debug("Notify");
-    	handleNotify(notify);
-    }
-    
-	protected void handleNotify(Notify notify) {
-		for (NotificationMessageHolderType messageHolder : notify.getNotificationMessage()) {
-            EndpointReferenceType producerReference = messageHolder.getProducerReference();
-            AbstractPublisher publisher = getPublisher(producerReference);
-            if (publisher != null) {
-            	publisher.notify(messageHolder);
-            }
-		}
-	}
-
-	protected AbstractPublisher getPublisher(EndpointReferenceType producerReference) {
-		AbstractPublisher publisher = null;
-		if (producerReference != null && 
-			producerReference.getAddress() != null &&
-			producerReference.getAddress().getValue() != null) {
-			String address = producerReference.getAddress().getValue();
-			publishers.get(address);
-		}
-		if (publisher == null) {
-			publisher = anonymousPublisher;
-		}
-		return publisher;
-	}
-	
-    /**
-     * 
-     * @param subscribeRequest
-     * @return
-     *     returns org.oasis_open.docs.wsn.b_1.SubscribeResponse
-     * @throws SubscribeCreationFailedFault
-     * @throws InvalidTopicExpressionFault
-     * @throws TopicNotSupportedFault
-     * @throws InvalidFilterFault
-     * @throws InvalidProducerPropertiesExpressionFault
-     * @throws ResourceUnknownFault
-     * @throws InvalidUseRawValueFault
-     * @throws InvalidMessageContentExpressionFault
-     * @throws TopicExpressionDialectUnknownFault
-     * @throws UnacceptableInitialTerminationTimeFault
-     */
-    @WebMethod(operationName = "Subscribe")
-    @WebResult(name = "SubscribeResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "SubscribeResponse")
-    public SubscribeResponse subscribe(
-        @WebParam(name = "Subscribe", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "SubscribeRequest")
-        Subscribe subscribeRequest)
-        throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, ResourceUnknownFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
-    	
-    	log.debug("Subscribe");
-    	return handleSubscribe(subscribeRequest, null);
-    }
-    
-	public SubscribeResponse handleSubscribe(Subscribe subscribeRequest,
-                                             EndpointManager manager) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
-		AbstractSubscription subscription = null;
-		boolean success = false;
-		try {
-			subscription = createSubcription(idGenerator.generateSanitizedId());
-            subscription.setBroker(this);
-			subscriptions.put(subscription.getAddress(), subscription);
-			subscription.create(subscribeRequest);
-            if (manager != null) {
-                subscription.setManager(manager);
-            }
-            subscription.register();
-			SubscribeResponse response = new SubscribeResponse();
-			response.setSubscriptionReference(createEndpointReference(subscription.getAddress()));
-			success = true;
-			return response;
-		} catch (EndpointRegistrationException e) {
-			SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
-			throw new SubscribeCreationFailedFault("Unable to register endpoint", fault, e);
-		} finally {
-			if (!success && subscription != null) {
-				subscriptions.remove(subscription);
-				try {
-					subscription.unsubscribe();
-				} catch (UnableToDestroySubscriptionFault e) {
-					log.info("Error destroying subscription", e);
-				}
-			}
-		}
-	}
-    
-    public void unsubscribe(String address) throws UnableToDestroySubscriptionFault {
-        AbstractSubscription subscription = (AbstractSubscription) subscriptions.remove(address);
-        if (subscription != null) {
-            subscription.unsubscribe();
-        }
-    }
-	
-	/**
-     * 
-     * @param getCurrentMessageRequest
-     * @return
-     *     returns org.oasis_open.docs.wsn.b_1.GetCurrentMessageResponse
-     * @throws MultipleTopicsSpecifiedFault
-     * @throws TopicNotSupportedFault
-     * @throws InvalidTopicExpressionFault
-     * @throws ResourceUnknownFault
-     * @throws TopicExpressionDialectUnknownFault
-     * @throws NoCurrentMessageOnTopicFault
-     */
-    @WebMethod(operationName = "GetCurrentMessage")
-    @WebResult(name = "GetCurrentMessageResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetCurrentMessageResponse")
-    public GetCurrentMessageResponse getCurrentMessage(
-        @WebParam(name = "GetCurrentMessage", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetCurrentMessageRequest")
-        GetCurrentMessage getCurrentMessageRequest)
-        throws InvalidTopicExpressionFault, MultipleTopicsSpecifiedFault, NoCurrentMessageOnTopicFault, ResourceUnknownFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault {
-    	
-    	log.debug("GetCurrentMessage");
-    	NoCurrentMessageOnTopicFaultType fault = new NoCurrentMessageOnTopicFaultType();
-    	throw new NoCurrentMessageOnTopicFault("There is no current message on this topic.", fault);
-    }
-
-    /**
-     * 
-     * @param registerPublisherRequest
-     * @return
-     *     returns org.oasis_open.docs.wsn.br_1.RegisterPublisherResponse
-     * @throws PublisherRegistrationRejectedFault
-     * @throws InvalidTopicExpressionFault
-     * @throws TopicNotSupportedFault
-     * @throws ResourceUnknownFault
-     * @throws PublisherRegistrationFailedFault
-     */
-    @WebMethod(operationName = "RegisterPublisher")
-    @WebResult(name = "RegisterPublisherResponse", targetNamespace = "http://docs.oasis-open.org/wsn/br-1", partName = "RegisterPublisherResponse")
-    public RegisterPublisherResponse registerPublisher(
-        @WebParam(name = "RegisterPublisher", targetNamespace = "http://docs.oasis-open.org/wsn/br-1", partName = "RegisterPublisherRequest")
-        RegisterPublisher registerPublisherRequest)
-        throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
-    	
-    	log.debug("RegisterPublisher");
-        return handleRegisterPublisher(registerPublisherRequest, null);
-    }
-    
-    public RegisterPublisherResponse handleRegisterPublisher(
-                        RegisterPublisher registerPublisherRequest,
-                        EndpointManager manager) throws InvalidTopicExpressionFault, 
-                                                        PublisherRegistrationFailedFault, 
-                                                        PublisherRegistrationRejectedFault, 
-                                                        ResourceUnknownFault, 
-                                                        TopicNotSupportedFault {
-        AbstractPublisher publisher = null;
-        boolean success = false;
-        try {
-            publisher = createPublisher(idGenerator.generateSanitizedId());
-            publishers.put(publisher.getAddress(), publisher);
-            if (manager != null) {
-                publisher.setManager(manager);
-            }
-            publisher.register();
-            publisher.create(registerPublisherRequest);
-            RegisterPublisherResponse response = new RegisterPublisherResponse(); 
-            response.setPublisherRegistrationReference(createEndpointReference(publisher.getAddress()));
-            success = true;
-            return response;
-        } catch (EndpointRegistrationException e) {
-            PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
-            throw new PublisherRegistrationFailedFault("Unable to register new endpoint", fault, e);
-        } finally {
-            if (!success && publisher != null) {
-                publishers.remove(publisher.getAddress());
-                try {
-                    publisher.destroy();
-                } catch (ResourceNotDestroyedFault e) {
-                    log.info("Error destroying publisher", e);
-                }
-            }
-        }
-    }
-
-	protected abstract AbstractPublisher createPublisher(String name);
-	
-	protected abstract AbstractSubscription createSubcription(String name);
-
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jws.Oneway;
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+
+import org.apache.activemq.util.IdGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
+import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
+import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
+import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
+import org.apache.servicemix.wsn.jaxws.MultipleTopicsSpecifiedFault;
+import org.apache.servicemix.wsn.jaxws.NoCurrentMessageOnTopicFault;
+import org.apache.servicemix.wsn.jaxws.NotificationBroker;
+import org.apache.servicemix.wsn.jaxws.PublisherRegistrationFailedFault;
+import org.apache.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault;
+import org.apache.servicemix.wsn.jaxws.ResourceNotDestroyedFault;
+import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
+import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
+import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
+import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
+import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault;
+import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
+import org.oasis_open.docs.wsn.b_2.GetCurrentMessage;
+import org.oasis_open.docs.wsn.b_2.GetCurrentMessageResponse;
+import org.oasis_open.docs.wsn.b_2.NoCurrentMessageOnTopicFaultType;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
+import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
+import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisherResponse;
+import org.w3._2005._08.addressing.EndpointReferenceType;
+
+@WebService(endpointInterface = "org.apache.servicemix.wsn.jaxws.NotificationBroker")
+public abstract class AbstractNotificationBroker extends AbstractEndpoint implements NotificationBroker {
+
+	private static Log log = LogFactory.getLog(AbstractNotificationBroker.class);
+	
+    private IdGenerator idGenerator;
+    private AbstractPublisher anonymousPublisher;
+    private Map<String,AbstractPublisher> publishers;
+    private Map<String,AbstractSubscription> subscriptions;
+
+	public AbstractNotificationBroker(String name) {
+		super(name);
+        idGenerator = new IdGenerator();
+        subscriptions = new ConcurrentHashMap<String,AbstractSubscription>();
+        publishers = new ConcurrentHashMap<String, AbstractPublisher>();
+	}
+
+    public void init() throws Exception {
+        register();
+        anonymousPublisher = createPublisher("Anonymous");
+        anonymousPublisher.register();
+    }
+    
+	protected String createAddress() {
+		return "http://servicemix.org/wsnotification/NotificationBroker/" + getName();
+	}
+	
+    /**
+     * 
+     * @param notify
+     */
+    @WebMethod(operationName = "Notify")
+    @Oneway
+    public void notify(
+        @WebParam(name = "Notify", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "Notify")
+        Notify notify) {
+    	
+    	log.debug("Notify");
+    	handleNotify(notify);
+    }
+    
+	protected void handleNotify(Notify notify) {
+		for (NotificationMessageHolderType messageHolder : notify.getNotificationMessage()) {
+            EndpointReferenceType producerReference = messageHolder.getProducerReference();
+            AbstractPublisher publisher = getPublisher(producerReference);
+            if (publisher != null) {
+            	publisher.notify(messageHolder);
+            }
+		}
+	}
+
+	protected AbstractPublisher getPublisher(EndpointReferenceType producerReference) {
+		AbstractPublisher publisher = null;
+		if (producerReference != null && 
+			producerReference.getAddress() != null &&
+			producerReference.getAddress().getValue() != null) {
+			String address = producerReference.getAddress().getValue();
+			publishers.get(address);
+		}
+		if (publisher == null) {
+			publisher = anonymousPublisher;
+		}
+		return publisher;
+	}
+	
+    /**
+     * 
+     * @param subscribeRequest
+     * @return
+     *     returns org.oasis_open.docs.wsn.b_1.SubscribeResponse
+     * @throws SubscribeCreationFailedFault
+     * @throws InvalidTopicExpressionFault
+     * @throws TopicNotSupportedFault
+     * @throws InvalidFilterFault
+     * @throws InvalidProducerPropertiesExpressionFault
+     * @throws ResourceUnknownFault
+     * @throws InvalidUseRawValueFault
+     * @throws InvalidMessageContentExpressionFault
+     * @throws TopicExpressionDialectUnknownFault
+     * @throws UnacceptableInitialTerminationTimeFault
+     */
+    @WebMethod(operationName = "Subscribe")
+    @WebResult(name = "SubscribeResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "SubscribeResponse")
+    public SubscribeResponse subscribe(
+        @WebParam(name = "Subscribe", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "SubscribeRequest")
+        Subscribe subscribeRequest)
+        throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, ResourceUnknownFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
+    	
+    	log.debug("Subscribe");
+    	return handleSubscribe(subscribeRequest, null);
+    }
+    
+	public SubscribeResponse handleSubscribe(Subscribe subscribeRequest,
+                                             EndpointManager manager) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
+		AbstractSubscription subscription = null;
+		boolean success = false;
+		try {
+			subscription = createSubcription(idGenerator.generateSanitizedId());
+            subscription.setBroker(this);
+			subscriptions.put(subscription.getAddress(), subscription);
+			subscription.create(subscribeRequest);
+            if (manager != null) {
+                subscription.setManager(manager);
+            }
+            subscription.register();
+			SubscribeResponse response = new SubscribeResponse();
+			response.setSubscriptionReference(createEndpointReference(subscription.getAddress()));
+			success = true;
+			return response;
+		} catch (EndpointRegistrationException e) {
+			SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
+			throw new SubscribeCreationFailedFault("Unable to register endpoint", fault, e);
+		} finally {
+			if (!success && subscription != null) {
+				subscriptions.remove(subscription);
+				try {
+					subscription.unsubscribe();
+				} catch (UnableToDestroySubscriptionFault e) {
+					log.info("Error destroying subscription", e);
+				}
+			}
+		}
+	}
+    
+    public void unsubscribe(String address) throws UnableToDestroySubscriptionFault {
+        AbstractSubscription subscription = (AbstractSubscription) subscriptions.remove(address);
+        if (subscription != null) {
+            subscription.unsubscribe();
+        }
+    }
+	
+	/**
+     * 
+     * @param getCurrentMessageRequest
+     * @return
+     *     returns org.oasis_open.docs.wsn.b_1.GetCurrentMessageResponse
+     * @throws MultipleTopicsSpecifiedFault
+     * @throws TopicNotSupportedFault
+     * @throws InvalidTopicExpressionFault
+     * @throws ResourceUnknownFault
+     * @throws TopicExpressionDialectUnknownFault
+     * @throws NoCurrentMessageOnTopicFault
+     */
+    @WebMethod(operationName = "GetCurrentMessage")
+    @WebResult(name = "GetCurrentMessageResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetCurrentMessageResponse")
+    public GetCurrentMessageResponse getCurrentMessage(
+        @WebParam(name = "GetCurrentMessage", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetCurrentMessageRequest")
+        GetCurrentMessage getCurrentMessageRequest)
+        throws InvalidTopicExpressionFault, MultipleTopicsSpecifiedFault, NoCurrentMessageOnTopicFault, ResourceUnknownFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault {
+    	
+    	log.debug("GetCurrentMessage");
+    	NoCurrentMessageOnTopicFaultType fault = new NoCurrentMessageOnTopicFaultType();
+    	throw new NoCurrentMessageOnTopicFault("There is no current message on this topic.", fault);
+    }
+
+    /**
+     * 
+     * @param registerPublisherRequest
+     * @return
+     *     returns org.oasis_open.docs.wsn.br_1.RegisterPublisherResponse
+     * @throws PublisherRegistrationRejectedFault
+     * @throws InvalidTopicExpressionFault
+     * @throws TopicNotSupportedFault
+     * @throws ResourceUnknownFault
+     * @throws PublisherRegistrationFailedFault
+     */
+    @WebMethod(operationName = "RegisterPublisher")
+    @WebResult(name = "RegisterPublisherResponse", targetNamespace = "http://docs.oasis-open.org/wsn/br-1", partName = "RegisterPublisherResponse")
+    public RegisterPublisherResponse registerPublisher(
+        @WebParam(name = "RegisterPublisher", targetNamespace = "http://docs.oasis-open.org/wsn/br-1", partName = "RegisterPublisherRequest")
+        RegisterPublisher registerPublisherRequest)
+        throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
+    	
+    	log.debug("RegisterPublisher");
+        return handleRegisterPublisher(registerPublisherRequest, null);
+    }
+    
+    public RegisterPublisherResponse handleRegisterPublisher(
+                        RegisterPublisher registerPublisherRequest,
+                        EndpointManager manager) throws InvalidTopicExpressionFault, 
+                                                        PublisherRegistrationFailedFault, 
+                                                        PublisherRegistrationRejectedFault, 
+                                                        ResourceUnknownFault, 
+                                                        TopicNotSupportedFault {
+        AbstractPublisher publisher = null;
+        boolean success = false;
+        try {
+            publisher = createPublisher(idGenerator.generateSanitizedId());
+            publishers.put(publisher.getAddress(), publisher);
+            if (manager != null) {
+                publisher.setManager(manager);
+            }
+            publisher.register();
+            publisher.create(registerPublisherRequest);
+            RegisterPublisherResponse response = new RegisterPublisherResponse(); 
+            response.setPublisherRegistrationReference(createEndpointReference(publisher.getAddress()));
+            success = true;
+            return response;
+        } catch (EndpointRegistrationException e) {
+            PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
+            throw new PublisherRegistrationFailedFault("Unable to register new endpoint", fault, e);
+        } finally {
+            if (!success && publisher != null) {
+                publishers.remove(publisher.getAddress());
+                try {
+                    publisher.destroy();
+                } catch (ResourceNotDestroyedFault e) {
+                    log.info("Error destroying publisher", e);
+                }
+            }
+        }
+    }
+
+	protected abstract AbstractPublisher createPublisher(String name);
+	
+	protected abstract AbstractSubscription createSubcription(String name);
+
+}

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractNotificationBroker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPublisher.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPublisher.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPublisher.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPublisher.java Tue Feb 21 15:40:05 2006
@@ -1,114 +1,114 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn;
-
-import java.util.List;
-
-import javax.jws.WebMethod;
-import javax.jws.WebParam;
-import javax.jws.WebResult;
-import javax.jws.WebService;
-
-import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
-import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
-import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
-import org.oasis_open.docs.wsn.br_2.DestroyRegistration;
-import org.oasis_open.docs.wsn.br_2.DestroyRegistrationResponse;
-import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType;
-import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
-import org.oasis_open.docs.wsn.br_2.ResourceNotDestroyedFaultType;
-import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
-import org.apache.servicemix.wsn.jaxws.PublisherRegistrationFailedFault;
-import org.apache.servicemix.wsn.jaxws.PublisherRegistrationManager;
-import org.apache.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault;
-import org.apache.servicemix.wsn.jaxws.ResourceNotDestroyedFault;
-import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
-import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
-import org.w3._2005._08.addressing.EndpointReferenceType;
-
-@WebService(endpointInterface = "org.apache.servicemix.wsn.jaxws.PublisherRegistrationManager")
-public abstract class AbstractPublisher extends AbstractEndpoint 
-									    implements PublisherRegistrationManager {
-
-	protected EndpointReferenceType publisherReference;
-	protected boolean demand;
-	protected List<TopicExpressionType> topic;
-	
-	public AbstractPublisher(String name) {
-		super(name);
-	}
-	
-    /**
-     * 
-     * @param destroyRequest
-     * @return
-     *     returns org.oasis_open.docs.wsn.br_1.DestroyResponse
-     * @throws ResourceNotDestroyedFault
-     * @throws ResourceUnknownFault
-     */
-    @WebMethod(operationName = "DestroyRegistration")
-    @WebResult(name = "DestroyRegistrationResponse", targetNamespace = "http://docs.oasis-open.org/wsn/br-2", partName = "DestroyRegistrationResponse")
-    public DestroyRegistrationResponse destroyRegistration(
-        @WebParam(name = "DestroyRegistration", targetNamespace = "http://docs.oasis-open.org/wsn/br-2", partName = "DestroyRegistrationRequest")
-        DestroyRegistration destroyRegistrationRequest)
-        throws ResourceNotDestroyedFault, ResourceUnknownFault {
-    	
-    	destroy();
-    	return new DestroyRegistrationResponse();
-    }
-    
-    public abstract void notify(NotificationMessageHolderType messageHolder);
-
-    protected void destroy() throws ResourceNotDestroyedFault {
-    	try {
-    		unregister();
-    	} catch (EndpointRegistrationException e) {
-    		ResourceNotDestroyedFaultType fault = new ResourceNotDestroyedFaultType();
-    		throw new ResourceNotDestroyedFault("Error unregistering endpoint", fault, e);
-    	}
-    }
-
-	protected String createAddress() {
-		return "http://servicemix.org/wsnotification/Publisher/" + getName();
-	}
-
-	public void create(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
-		validatePublisher(registerPublisherRequest);
-		start();
-	}
-	
-	protected void validatePublisher(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
-		// Check consumer reference
-		publisherReference = registerPublisherRequest.getPublisherReference();
-		// Check topic
-		topic = registerPublisherRequest.getTopic();
-		// Check demand based
-		demand = registerPublisherRequest.isDemand() != null ? registerPublisherRequest.isDemand().booleanValue() : false;
-		// Check all parameters
-		if (publisherReference == null) {
-			PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
-			throw new PublisherRegistrationFailedFault("Invalid PublisherReference: null", fault);
-		}
-		if (demand) {
-			if (topic == null || topic.size() == 0) {
-				InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
-				throw new InvalidTopicExpressionFault("Must specify at least one topic for demand-based publishing", fault);
-			}
-		}
-	}
-	
-	protected abstract void start() throws PublisherRegistrationFailedFault;
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn;
+
+import java.util.List;
+
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+
+import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
+import org.oasis_open.docs.wsn.br_2.DestroyRegistration;
+import org.oasis_open.docs.wsn.br_2.DestroyRegistrationResponse;
+import org.oasis_open.docs.wsn.br_2.PublisherRegistrationFailedFaultType;
+import org.oasis_open.docs.wsn.br_2.RegisterPublisher;
+import org.oasis_open.docs.wsn.br_2.ResourceNotDestroyedFaultType;
+import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
+import org.apache.servicemix.wsn.jaxws.PublisherRegistrationFailedFault;
+import org.apache.servicemix.wsn.jaxws.PublisherRegistrationManager;
+import org.apache.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault;
+import org.apache.servicemix.wsn.jaxws.ResourceNotDestroyedFault;
+import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
+import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
+import org.w3._2005._08.addressing.EndpointReferenceType;
+
+@WebService(endpointInterface = "org.apache.servicemix.wsn.jaxws.PublisherRegistrationManager")
+public abstract class AbstractPublisher extends AbstractEndpoint 
+									    implements PublisherRegistrationManager {
+
+	protected EndpointReferenceType publisherReference;
+	protected boolean demand;
+	protected List<TopicExpressionType> topic;
+	
+	public AbstractPublisher(String name) {
+		super(name);
+	}
+	
+    /**
+     * 
+     * @param destroyRequest
+     * @return
+     *     returns org.oasis_open.docs.wsn.br_1.DestroyResponse
+     * @throws ResourceNotDestroyedFault
+     * @throws ResourceUnknownFault
+     */
+    @WebMethod(operationName = "DestroyRegistration")
+    @WebResult(name = "DestroyRegistrationResponse", targetNamespace = "http://docs.oasis-open.org/wsn/br-2", partName = "DestroyRegistrationResponse")
+    public DestroyRegistrationResponse destroyRegistration(
+        @WebParam(name = "DestroyRegistration", targetNamespace = "http://docs.oasis-open.org/wsn/br-2", partName = "DestroyRegistrationRequest")
+        DestroyRegistration destroyRegistrationRequest)
+        throws ResourceNotDestroyedFault, ResourceUnknownFault {
+    	
+    	destroy();
+    	return new DestroyRegistrationResponse();
+    }
+    
+    public abstract void notify(NotificationMessageHolderType messageHolder);
+
+    protected void destroy() throws ResourceNotDestroyedFault {
+    	try {
+    		unregister();
+    	} catch (EndpointRegistrationException e) {
+    		ResourceNotDestroyedFaultType fault = new ResourceNotDestroyedFaultType();
+    		throw new ResourceNotDestroyedFault("Error unregistering endpoint", fault, e);
+    	}
+    }
+
+	protected String createAddress() {
+		return "http://servicemix.org/wsnotification/Publisher/" + getName();
+	}
+
+	public void create(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
+		validatePublisher(registerPublisherRequest);
+		start();
+	}
+	
+	protected void validatePublisher(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
+		// Check consumer reference
+		publisherReference = registerPublisherRequest.getPublisherReference();
+		// Check topic
+		topic = registerPublisherRequest.getTopic();
+		// Check demand based
+		demand = registerPublisherRequest.isDemand() != null ? registerPublisherRequest.isDemand().booleanValue() : false;
+		// Check all parameters
+		if (publisherReference == null) {
+			PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
+			throw new PublisherRegistrationFailedFault("Invalid PublisherReference: null", fault);
+		}
+		if (demand) {
+			if (topic == null || topic.size() == 0) {
+				InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+				throw new InvalidTopicExpressionFault("Must specify at least one topic for demand-based publishing", fault);
+			}
+		}
+	}
+	
+	protected abstract void start() throws PublisherRegistrationFailedFault;
+}

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPublisher.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPullPoint.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPullPoint.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPullPoint.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPullPoint.java Tue Feb 21 15:40:05 2006
@@ -1,139 +1,139 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn;
-
-import java.math.BigInteger;
-import java.util.List;
-
-import javax.jws.Oneway;
-import javax.jws.WebMethod;
-import javax.jws.WebParam;
-import javax.jws.WebResult;
-import javax.jws.WebService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.oasis_open.docs.wsn.b_2.CreatePullPoint;
-import org.oasis_open.docs.wsn.b_2.DestroyPullPoint;
-import org.oasis_open.docs.wsn.b_2.DestroyPullPointResponse;
-import org.oasis_open.docs.wsn.b_2.GetMessages;
-import org.oasis_open.docs.wsn.b_2.GetMessagesResponse;
-import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
-import org.oasis_open.docs.wsn.b_2.Notify;
-import org.oasis_open.docs.wsn.b_2.UnableToDestroyPullPointFaultType;
-import org.apache.servicemix.wsn.jaxws.NotificationConsumer;
-import org.apache.servicemix.wsn.jaxws.PullPoint;
-import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
-import org.apache.servicemix.wsn.jaxws.UnableToCreatePullPointFault;
-import org.apache.servicemix.wsn.jaxws.UnableToDestroyPullPointFault;
-
-@WebService(endpointInterface = "org.apache.servicemix.wsn.PullPointConsumer")
-public abstract class AbstractPullPoint extends AbstractEndpoint 
-										implements PullPoint, NotificationConsumer {
-
-	private static Log log = LogFactory.getLog(AbstractPullPoint.class);
-	
-    protected AbstractCreatePullPoint createPullPoint;
-    
-	public AbstractPullPoint(String name) {
-		super(name);
-	}
-	
-    /**
-     * 
-     * @param notify
-     */
-    @WebMethod(operationName = "Notify")
-    @Oneway
-    public void notify(
-        @WebParam(name = "Notify", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "Notify")
-        Notify notify) {
-    
-    	log.debug("Notify");
-    	for (NotificationMessageHolderType messageHolder : notify.getNotificationMessage()) {
-    		store(messageHolder);
-    	}
-    }
-    
-    /**
-     * 
-     * @param getMessagesRequest
-     * @return
-     *     returns org.oasis_open.docs.wsn.b_1.GetMessagesResponse
-     * @throws ResourceUnknownFault
-     */
-    @WebMethod(operationName = "GetMessages")
-    @WebResult(name = "GetMessagesResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetMessagesResponse")
-    public GetMessagesResponse getMessages(
-        @WebParam(name = "GetMessages", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetMessagesRequest")
-        GetMessages getMessagesRequest)
-        throws ResourceUnknownFault {
-    	
-    	log.debug("GetMessages");
-    	BigInteger max = getMessagesRequest.getMaximumNumber();
-    	List<NotificationMessageHolderType> messages = getMessages(max != null ? max.intValue() : 0);
-    	GetMessagesResponse response = new GetMessagesResponse();
-    	response.getNotificationMessage().addAll(messages);
-    	return response;
-    }
-    
-    /**
-     * 
-     * @param destroyRequest
-     * @return
-     *     returns org.oasis_open.docs.wsn.b_1.DestroyResponse
-     * @throws UnableToDestroyPullPoint
-     */
-    @WebMethod(operationName = "DestroyPullPoint")
-    @WebResult(name = "DestroyPullPointResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "DestroyPullPointResponse")
-    public DestroyPullPointResponse destroyPullPoint(
-        @WebParam(name = "DestroyPullPoint", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "DestroyPullPointRequest")
-        DestroyPullPoint destroyPullPointRequest)
-        throws UnableToDestroyPullPointFault {
-    	
-    	log.debug("Destroy");
-        createPullPoint.destroyPullPoint(getAddress());
-    	return new DestroyPullPointResponse();
-    }
-    
-    public void create(CreatePullPoint createPullPointRequest) throws UnableToCreatePullPointFault {
-    }
-    
-	protected abstract void store(NotificationMessageHolderType messageHolder);
-
-    protected abstract List<NotificationMessageHolderType> getMessages(int max) throws ResourceUnknownFault;
-
-    protected void destroy() throws UnableToDestroyPullPointFault {
-    	try {
-    		unregister();
-    	} catch (EndpointRegistrationException e) {
-    		UnableToDestroyPullPointFaultType fault = new UnableToDestroyPullPointFaultType();
-    		throw new UnableToDestroyPullPointFault("Error unregistering endpoint", fault, e);
-    	}
-    }
-
-	protected String createAddress() {
-		return "http://servicemix.org/wsnotification/PullPoint/" + getName();
-	}
-
-    public AbstractCreatePullPoint getCreatePullPoint() {
-        return createPullPoint;
-    }
-
-    public void setCreatePullPoint(AbstractCreatePullPoint createPullPoint) {
-        this.createPullPoint = createPullPoint;
-    }
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import javax.jws.Oneway;
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.oasis_open.docs.wsn.b_2.CreatePullPoint;
+import org.oasis_open.docs.wsn.b_2.DestroyPullPoint;
+import org.oasis_open.docs.wsn.b_2.DestroyPullPointResponse;
+import org.oasis_open.docs.wsn.b_2.GetMessages;
+import org.oasis_open.docs.wsn.b_2.GetMessagesResponse;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.UnableToDestroyPullPointFaultType;
+import org.apache.servicemix.wsn.jaxws.NotificationConsumer;
+import org.apache.servicemix.wsn.jaxws.PullPoint;
+import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
+import org.apache.servicemix.wsn.jaxws.UnableToCreatePullPointFault;
+import org.apache.servicemix.wsn.jaxws.UnableToDestroyPullPointFault;
+
+@WebService(endpointInterface = "org.apache.servicemix.wsn.PullPointConsumer")
+public abstract class AbstractPullPoint extends AbstractEndpoint 
+										implements PullPoint, NotificationConsumer {
+
+	private static Log log = LogFactory.getLog(AbstractPullPoint.class);
+	
+    protected AbstractCreatePullPoint createPullPoint;
+    
+	public AbstractPullPoint(String name) {
+		super(name);
+	}
+	
+    /**
+     * 
+     * @param notify
+     */
+    @WebMethod(operationName = "Notify")
+    @Oneway
+    public void notify(
+        @WebParam(name = "Notify", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "Notify")
+        Notify notify) {
+    
+    	log.debug("Notify");
+    	for (NotificationMessageHolderType messageHolder : notify.getNotificationMessage()) {
+    		store(messageHolder);
+    	}
+    }
+    
+    /**
+     * 
+     * @param getMessagesRequest
+     * @return
+     *     returns org.oasis_open.docs.wsn.b_1.GetMessagesResponse
+     * @throws ResourceUnknownFault
+     */
+    @WebMethod(operationName = "GetMessages")
+    @WebResult(name = "GetMessagesResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetMessagesResponse")
+    public GetMessagesResponse getMessages(
+        @WebParam(name = "GetMessages", targetNamespace = "http://docs.oasis-open.org/wsn/b-1", partName = "GetMessagesRequest")
+        GetMessages getMessagesRequest)
+        throws ResourceUnknownFault {
+    	
+    	log.debug("GetMessages");
+    	BigInteger max = getMessagesRequest.getMaximumNumber();
+    	List<NotificationMessageHolderType> messages = getMessages(max != null ? max.intValue() : 0);
+    	GetMessagesResponse response = new GetMessagesResponse();
+    	response.getNotificationMessage().addAll(messages);
+    	return response;
+    }
+    
+    /**
+     * 
+     * @param destroyRequest
+     * @return
+     *     returns org.oasis_open.docs.wsn.b_1.DestroyResponse
+     * @throws UnableToDestroyPullPoint
+     */
+    @WebMethod(operationName = "DestroyPullPoint")
+    @WebResult(name = "DestroyPullPointResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "DestroyPullPointResponse")
+    public DestroyPullPointResponse destroyPullPoint(
+        @WebParam(name = "DestroyPullPoint", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "DestroyPullPointRequest")
+        DestroyPullPoint destroyPullPointRequest)
+        throws UnableToDestroyPullPointFault {
+    	
+    	log.debug("Destroy");
+        createPullPoint.destroyPullPoint(getAddress());
+    	return new DestroyPullPointResponse();
+    }
+    
+    public void create(CreatePullPoint createPullPointRequest) throws UnableToCreatePullPointFault {
+    }
+    
+	protected abstract void store(NotificationMessageHolderType messageHolder);
+
+    protected abstract List<NotificationMessageHolderType> getMessages(int max) throws ResourceUnknownFault;
+
+    protected void destroy() throws UnableToDestroyPullPointFault {
+    	try {
+    		unregister();
+    	} catch (EndpointRegistrationException e) {
+    		UnableToDestroyPullPointFaultType fault = new UnableToDestroyPullPointFaultType();
+    		throw new UnableToDestroyPullPointFault("Error unregistering endpoint", fault, e);
+    	}
+    }
+
+	protected String createAddress() {
+		return "http://servicemix.org/wsnotification/PullPoint/" + getName();
+	}
+
+    public AbstractCreatePullPoint getCreatePullPoint() {
+        return createPullPoint;
+    }
+
+    public void setCreatePullPoint(AbstractCreatePullPoint createPullPoint) {
+        this.createPullPoint = createPullPoint;
+    }
+}

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractPullPoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractSubscription.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractSubscription.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractSubscription.java Tue Feb 21 15:40:05 2006
@@ -1,372 +1,372 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn;
-
-import java.util.GregorianCalendar;
-
-import javax.jws.WebMethod;
-import javax.jws.WebParam;
-import javax.jws.WebResult;
-import javax.jws.WebService;
-import javax.xml.bind.JAXBElement;
-import javax.xml.datatype.DatatypeConfigurationException;
-import javax.xml.datatype.DatatypeConstants;
-import javax.xml.datatype.DatatypeFactory;
-import javax.xml.datatype.Duration;
-import javax.xml.datatype.XMLGregorianCalendar;
-import javax.xml.namespace.QName;
-
-import org.oasis_open.docs.wsn.b_2.InvalidFilterFaultType;
-import org.oasis_open.docs.wsn.b_2.InvalidMessageContentExpressionFaultType;
-import org.oasis_open.docs.wsn.b_2.InvalidProducerPropertiesExpressionFaultType;
-import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
-import org.oasis_open.docs.wsn.b_2.PauseSubscription;
-import org.oasis_open.docs.wsn.b_2.PauseSubscriptionResponse;
-import org.oasis_open.docs.wsn.b_2.QueryExpressionType;
-import org.oasis_open.docs.wsn.b_2.Renew;
-import org.oasis_open.docs.wsn.b_2.RenewResponse;
-import org.oasis_open.docs.wsn.b_2.ResumeSubscription;
-import org.oasis_open.docs.wsn.b_2.ResumeSubscriptionResponse;
-import org.oasis_open.docs.wsn.b_2.Subscribe;
-import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
-import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
-import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType;
-import org.oasis_open.docs.wsn.b_2.UnacceptableInitialTerminationTimeFaultType;
-import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType;
-import org.oasis_open.docs.wsn.b_2.Unsubscribe;
-import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse;
-import org.oasis_open.docs.wsn.b_2.UseRaw;
-import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
-import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
-import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
-import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
-import org.apache.servicemix.wsn.jaxws.PausableSubscriptionManager;
-import org.apache.servicemix.wsn.jaxws.PauseFailedFault;
-import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
-import org.apache.servicemix.wsn.jaxws.ResumeFailedFault;
-import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
-import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
-import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
-import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault;
-import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
-import org.apache.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault;
-import org.w3._2005._08.addressing.EndpointReferenceType;
-
-@WebService(endpointInterface = "org.apache.servicemix.wsn.jaxws.PausableSubscriptionManager")
-public abstract class AbstractSubscription extends AbstractEndpoint 
-										   implements PausableSubscriptionManager {
-
-	public static final String WSN_URI = "http://docs.oasis-open.org/wsn/b-2";
-	public static final String XPATH1_URI = "http://www.w3.org/TR/1999/REC-xpath-19991116";
-	public static final QName QNAME_TOPIC_EXPRESSION = new QName(WSN_URI, "TopicExpression");
-	public static final QName QNAME_PRODUCER_PROPERTIES = new QName(WSN_URI, "ProducerProperties");
-	public static final QName QNAME_MESSAGE_CONTENT = new QName(WSN_URI, "MessageContent");
-	public static final QName QNAME_USE_RAW = new QName(WSN_URI, "UseRaw");
-	
-	protected DatatypeFactory datatypeFactory;
-	protected XMLGregorianCalendar terminationTime;
-	protected boolean useRaw;
-	protected TopicExpressionType topic;
-	protected QueryExpressionType contentFilter;
-	protected EndpointReferenceType consumerReference;
-    protected AbstractNotificationBroker broker;
-	
-	public AbstractSubscription(String name) {
-		super(name);
-		try {
-			this.datatypeFactory = DatatypeFactory.newInstance();
-		} catch (DatatypeConfigurationException e) {
-			throw new RuntimeException("Unable to initialize subscription", e);
-		}
-	}
-	
-    /**
-     * 
-     * @param renewRequest
-     * @return
-     *     returns org.oasis_open.docs.wsn.b_1.RenewResponse
-     * @throws UnacceptableTerminationTimeFault
-     * @throws ResourceUnknownFault
-     */
-    @WebMethod(operationName = "Renew")
-    @WebResult(name = "RenewResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "RenewResponse")
-    public RenewResponse renew(
-        @WebParam(name = "Renew", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "RenewRequest")
-        Renew renewRequest)
-        throws ResourceUnknownFault, UnacceptableTerminationTimeFault {
-    	
-    	XMLGregorianCalendar terminationTime = validateTerminationTime(renewRequest.getTerminationTime());
-    	renew(terminationTime);
-    	RenewResponse response = new RenewResponse();
-    	response.setTerminationTime(terminationTime);
-    	response.setCurrentTime(getCurrentTime());
-    	return response;
-    }
-    
-    /**
-     * 
-     * @param unsubscribeRequest
-     * @return
-     *     returns org.oasis_open.docs.wsn.b_1.UnsubscribeResponse
-     * @throws UnableToDestroySubscriptionFault
-     * @throws ResourceUnknownFault
-     */
-    @WebMethod(operationName = "Unsubscribe")
-    @WebResult(name = "UnsubscribeResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "UnsubscribeResponse")
-    public UnsubscribeResponse unsubscribe(
-        @WebParam(name = "Unsubscribe", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "UnsubscribeRequest")
-        Unsubscribe unsubscribeRequest)
-        throws ResourceUnknownFault, UnableToDestroySubscriptionFault {
-
-    	broker.unsubscribe(getAddress());
-    	return new UnsubscribeResponse();
-    }
-    
-    /**
-     * 
-     * @param pauseSubscriptionRequest
-     * @return
-     *     returns org.oasis_open.docs.wsn.b_1.PauseSubscriptionResponse
-     * @throws PauseFailedFault
-     * @throws ResourceUnknownFault
-     */
-    @WebMethod(operationName = "PauseSubscription")
-    @WebResult(name = "PauseSubscriptionResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "PauseSubscriptionResponse")
-    public PauseSubscriptionResponse pauseSubscription(
-        @WebParam(name = "PauseSubscription", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "PauseSubscriptionRequest")
-        PauseSubscription pauseSubscriptionRequest)
-        throws PauseFailedFault, ResourceUnknownFault {
-
-    	pause();
-    	return new PauseSubscriptionResponse();
-    }
-
-    /**
-     * 
-     * @param resumeSubscriptionRequest
-     * @return
-     *     returns org.oasis_open.docs.wsn.b_1.ResumeSubscriptionResponse
-     * @throws ResumeFailedFault
-     * @throws ResourceUnknownFault
-     */
-    @WebMethod(operationName = "ResumeSubscription")
-    @WebResult(name = "ResumeSubscriptionResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "ResumeSubscriptionResponse")
-    public ResumeSubscriptionResponse resumeSubscription(
-        @WebParam(name = "ResumeSubscription", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "ResumeSubscriptionRequest")
-        ResumeSubscription resumeSubscriptionRequest)
-        throws ResourceUnknownFault, ResumeFailedFault {
-
-    	resume();
-    	return new ResumeSubscriptionResponse();
-    }
-    
-    protected XMLGregorianCalendar validateInitialTerminationTime(String value) throws UnacceptableInitialTerminationTimeFault {
-    	XMLGregorianCalendar tt = parseTerminationTime(value);
-    	if (tt == null) {
-    		UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
-        	throw new UnacceptableInitialTerminationTimeFault(
-        			"Unable to parse initial termination time: '" + value + "'",
-        			fault);
-    	}
-    	XMLGregorianCalendar ct = getCurrentTime();
-    	int c = tt.compare(ct);
-    	if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) {
-    		UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
-        	fault.setMinimumTime(ct);
-        	throw new UnacceptableInitialTerminationTimeFault(
-        			"Invalid initial termination time",
-        			fault);
-    	}
-    	return tt;
-    }
-    
-    protected XMLGregorianCalendar validateTerminationTime(String value) throws UnacceptableTerminationTimeFault {
-    	XMLGregorianCalendar tt = parseTerminationTime(value);
-    	if (tt == null) {
-        	UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
-        	throw new UnacceptableTerminationTimeFault(
-        			"Unable to parse termination time: '" + value + "'",
-        			fault);
-    	}
-    	XMLGregorianCalendar ct = getCurrentTime();
-    	int c = tt.compare(ct);
-    	if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) {
-        	UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
-        	fault.setMinimumTime(ct);
-        	throw new UnacceptableTerminationTimeFault(
-        			"Invalid termination time",
-        			fault);
-    	}
-    	return tt;
-    }
-    
-    protected XMLGregorianCalendar parseTerminationTime(String value) {
-    	try {
-    		Duration d = datatypeFactory.newDuration(value);
-    		XMLGregorianCalendar c = getCurrentTime();
-    		c.add(d);
-    		return c;
-    	} catch (Exception e) { }
-    	try {
-    		Duration d = datatypeFactory.newDurationDayTime(value);
-    		XMLGregorianCalendar c = getCurrentTime();
-    		c.add(d);
-    		return c;
-    	} catch (Exception e) { }
-    	try {
-    		Duration d = datatypeFactory.newDurationYearMonth(value);
-    		XMLGregorianCalendar c = getCurrentTime();
-    		c.add(d);
-    		return c;
-    	} catch (Exception e) { }
-    	try {
-    		return datatypeFactory.newXMLGregorianCalendar(value);
-    	} catch (Exception e) { }
-    	return null;
-    }
-    
-    protected XMLGregorianCalendar getCurrentTime() {
-    	return datatypeFactory.newXMLGregorianCalendar(new GregorianCalendar());
-    }
-
-	public XMLGregorianCalendar getTerminationTime() {
-		return terminationTime;
-	}
-
-	public void setTerminationTime(XMLGregorianCalendar terminationTime) {
-		this.terminationTime = terminationTime;
-	}
-	
-	public void create(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
-		validateSubscription(subscribeRequest);
-		start();
-	}
-	
-	protected abstract void start() throws SubscribeCreationFailedFault;
-	
-	protected abstract void pause() throws PauseFailedFault;
-	
-    protected abstract void resume() throws ResumeFailedFault;
-
-    protected abstract void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault;
-
-    protected void unsubscribe() throws UnableToDestroySubscriptionFault {
-    	try {
-    		unregister();
-    	} catch (EndpointRegistrationException e) {
-    		UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
-    		throw new UnableToDestroySubscriptionFault("Error unregistering endpoint", fault, e);
-    	}
-    }
-
-	protected String createAddress() {
-		return "http://servicemix.org/wsnotification/Subscription/" + getName();
-	}
-
-	protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
-		// Check consumer reference
-		consumerReference = subscribeRequest.getConsumerReference();
-		// Check terminationTime
-		if (subscribeRequest.getInitialTerminationTime() != null &&
-			subscribeRequest.getInitialTerminationTime().isNil() == false &&
-			subscribeRequest.getInitialTerminationTime().getValue() != null) {
-			String strTerminationTime = subscribeRequest.getInitialTerminationTime().getValue();
-			terminationTime = validateInitialTerminationTime(strTerminationTime.trim());
-		}
-		// Check filter
-		if (subscribeRequest.getFilter() != null) {
-			for (Object f : subscribeRequest.getFilter().getAny()) {
-				JAXBElement e = null;
-				if (f instanceof JAXBElement) {
-					e = (JAXBElement) f;
-					f = e.getValue();
-				}
-				if (f instanceof TopicExpressionType) {
-					if (!e.getName().equals(QNAME_TOPIC_EXPRESSION)) {
-						InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
-						throw new InvalidTopicExpressionFault("Unrecognized TopicExpression: " + e, fault);
-					}
-					topic = (TopicExpressionType) f;
-				} else if (f instanceof QueryExpressionType) {
-					if (e != null && e.getName().equals(QNAME_PRODUCER_PROPERTIES)) {
-						InvalidProducerPropertiesExpressionFaultType fault = new InvalidProducerPropertiesExpressionFaultType();
-						throw new InvalidProducerPropertiesExpressionFault("ProducerProperties are not supported", fault);
-					} else if (e != null && e.getName().equals(QNAME_MESSAGE_CONTENT)) {
-						if (contentFilter != null) {
-							InvalidMessageContentExpressionFaultType fault = new InvalidMessageContentExpressionFaultType();
-							throw new InvalidMessageContentExpressionFault("Only one MessageContent filter can be specified", fault);
-						}
-						contentFilter = (QueryExpressionType) f;
-						// Defaults to XPath 1.0
-						if (contentFilter.getDialect() == null) {
-							contentFilter.setDialect(XPATH1_URI);
-						}
-					} else {
-						InvalidFilterFaultType fault = new InvalidFilterFaultType();
-						throw new InvalidFilterFault("Unrecognized filter: " + (e != null ? e.getName() : f), fault);
-					}
-				} else {
-					InvalidFilterFaultType fault = new InvalidFilterFaultType();
-					throw new InvalidFilterFault("Unrecognized filter: " + (e != null ? e.getName() : f), fault);
-				}
-			}
-		}
-		// Check policy
-		if (subscribeRequest.getSubscriptionPolicy() != null) {
-			for (Object p : subscribeRequest.getSubscriptionPolicy().getAny()) {
-				JAXBElement e = null;
-				if (p instanceof JAXBElement) {
-					e = (JAXBElement) p;
-					p = e.getValue();
-				}
-				if (p instanceof UseRaw) {
-					useRaw = true;
-				} else {
-					InvalidFilterFaultType fault = new InvalidFilterFaultType();
-					throw new InvalidFilterFault("Unrecognized policy: " + p, fault);
-				}
-			}
-		}
-		// Check all parameters
-		if (consumerReference == null) {
-			SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
-			throw new SubscribeCreationFailedFault("Invalid ConsumerReference: null", fault);
-		}
-		// TODO check we can resolve endpoint
-		if (topic == null) {
-			InvalidFilterFaultType fault = new InvalidFilterFaultType();
-			throw new InvalidFilterFault("Must specify a topic to subscribe on", fault);
-		}
-		if (contentFilter != null && !contentFilter.getDialect().equals(XPATH1_URI)) {
-			InvalidMessageContentExpressionFaultType fault = new InvalidMessageContentExpressionFaultType();
-			throw new InvalidMessageContentExpressionFault("Unsupported MessageContent dialect: '" + contentFilter.getDialect() + "'", fault);
-		}
-		if (terminationTime != null) {
-			UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
-	    	throw new UnacceptableInitialTerminationTimeFault(
-	    			"InitialTerminationTime is not supported",
-	    			fault);
-		}
-	}
-
-    public AbstractNotificationBroker getBroker() {
-        return broker;
-    }
-
-    public void setBroker(AbstractNotificationBroker broker) {
-        this.broker = broker;
-    }
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn;
+
+import java.util.GregorianCalendar;
+
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
+import javax.xml.bind.JAXBElement;
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeConstants;
+import javax.xml.datatype.DatatypeFactory;
+import javax.xml.datatype.Duration;
+import javax.xml.datatype.XMLGregorianCalendar;
+import javax.xml.namespace.QName;
+
+import org.oasis_open.docs.wsn.b_2.InvalidFilterFaultType;
+import org.oasis_open.docs.wsn.b_2.InvalidMessageContentExpressionFaultType;
+import org.oasis_open.docs.wsn.b_2.InvalidProducerPropertiesExpressionFaultType;
+import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
+import org.oasis_open.docs.wsn.b_2.PauseSubscription;
+import org.oasis_open.docs.wsn.b_2.PauseSubscriptionResponse;
+import org.oasis_open.docs.wsn.b_2.QueryExpressionType;
+import org.oasis_open.docs.wsn.b_2.Renew;
+import org.oasis_open.docs.wsn.b_2.RenewResponse;
+import org.oasis_open.docs.wsn.b_2.ResumeSubscription;
+import org.oasis_open.docs.wsn.b_2.ResumeSubscriptionResponse;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
+import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
+import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType;
+import org.oasis_open.docs.wsn.b_2.UnacceptableInitialTerminationTimeFaultType;
+import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType;
+import org.oasis_open.docs.wsn.b_2.Unsubscribe;
+import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse;
+import org.oasis_open.docs.wsn.b_2.UseRaw;
+import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
+import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
+import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
+import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
+import org.apache.servicemix.wsn.jaxws.PausableSubscriptionManager;
+import org.apache.servicemix.wsn.jaxws.PauseFailedFault;
+import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
+import org.apache.servicemix.wsn.jaxws.ResumeFailedFault;
+import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
+import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
+import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
+import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault;
+import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
+import org.apache.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault;
+import org.w3._2005._08.addressing.EndpointReferenceType;
+
+@WebService(endpointInterface = "org.apache.servicemix.wsn.jaxws.PausableSubscriptionManager")
+public abstract class AbstractSubscription extends AbstractEndpoint 
+										   implements PausableSubscriptionManager {
+
+	public static final String WSN_URI = "http://docs.oasis-open.org/wsn/b-2";
+	public static final String XPATH1_URI = "http://www.w3.org/TR/1999/REC-xpath-19991116";
+	public static final QName QNAME_TOPIC_EXPRESSION = new QName(WSN_URI, "TopicExpression");
+	public static final QName QNAME_PRODUCER_PROPERTIES = new QName(WSN_URI, "ProducerProperties");
+	public static final QName QNAME_MESSAGE_CONTENT = new QName(WSN_URI, "MessageContent");
+	public static final QName QNAME_USE_RAW = new QName(WSN_URI, "UseRaw");
+	
+	protected DatatypeFactory datatypeFactory;
+	protected XMLGregorianCalendar terminationTime;
+	protected boolean useRaw;
+	protected TopicExpressionType topic;
+	protected QueryExpressionType contentFilter;
+	protected EndpointReferenceType consumerReference;
+    protected AbstractNotificationBroker broker;
+	
+	public AbstractSubscription(String name) {
+		super(name);
+		try {
+			this.datatypeFactory = DatatypeFactory.newInstance();
+		} catch (DatatypeConfigurationException e) {
+			throw new RuntimeException("Unable to initialize subscription", e);
+		}
+	}
+	
+    /**
+     * 
+     * @param renewRequest
+     * @return
+     *     returns org.oasis_open.docs.wsn.b_1.RenewResponse
+     * @throws UnacceptableTerminationTimeFault
+     * @throws ResourceUnknownFault
+     */
+    @WebMethod(operationName = "Renew")
+    @WebResult(name = "RenewResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "RenewResponse")
+    public RenewResponse renew(
+        @WebParam(name = "Renew", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "RenewRequest")
+        Renew renewRequest)
+        throws ResourceUnknownFault, UnacceptableTerminationTimeFault {
+    	
+    	XMLGregorianCalendar terminationTime = validateTerminationTime(renewRequest.getTerminationTime());
+    	renew(terminationTime);
+    	RenewResponse response = new RenewResponse();
+    	response.setTerminationTime(terminationTime);
+    	response.setCurrentTime(getCurrentTime());
+    	return response;
+    }
+    
+    /**
+     * 
+     * @param unsubscribeRequest
+     * @return
+     *     returns org.oasis_open.docs.wsn.b_1.UnsubscribeResponse
+     * @throws UnableToDestroySubscriptionFault
+     * @throws ResourceUnknownFault
+     */
+    @WebMethod(operationName = "Unsubscribe")
+    @WebResult(name = "UnsubscribeResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "UnsubscribeResponse")
+    public UnsubscribeResponse unsubscribe(
+        @WebParam(name = "Unsubscribe", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "UnsubscribeRequest")
+        Unsubscribe unsubscribeRequest)
+        throws ResourceUnknownFault, UnableToDestroySubscriptionFault {
+
+    	broker.unsubscribe(getAddress());
+    	return new UnsubscribeResponse();
+    }
+    
+    /**
+     * 
+     * @param pauseSubscriptionRequest
+     * @return
+     *     returns org.oasis_open.docs.wsn.b_1.PauseSubscriptionResponse
+     * @throws PauseFailedFault
+     * @throws ResourceUnknownFault
+     */
+    @WebMethod(operationName = "PauseSubscription")
+    @WebResult(name = "PauseSubscriptionResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "PauseSubscriptionResponse")
+    public PauseSubscriptionResponse pauseSubscription(
+        @WebParam(name = "PauseSubscription", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "PauseSubscriptionRequest")
+        PauseSubscription pauseSubscriptionRequest)
+        throws PauseFailedFault, ResourceUnknownFault {
+
+    	pause();
+    	return new PauseSubscriptionResponse();
+    }
+
+    /**
+     * 
+     * @param resumeSubscriptionRequest
+     * @return
+     *     returns org.oasis_open.docs.wsn.b_1.ResumeSubscriptionResponse
+     * @throws ResumeFailedFault
+     * @throws ResourceUnknownFault
+     */
+    @WebMethod(operationName = "ResumeSubscription")
+    @WebResult(name = "ResumeSubscriptionResponse", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "ResumeSubscriptionResponse")
+    public ResumeSubscriptionResponse resumeSubscription(
+        @WebParam(name = "ResumeSubscription", targetNamespace = "http://docs.oasis-open.org/wsn/b-2", partName = "ResumeSubscriptionRequest")
+        ResumeSubscription resumeSubscriptionRequest)
+        throws ResourceUnknownFault, ResumeFailedFault {
+
+    	resume();
+    	return new ResumeSubscriptionResponse();
+    }
+    
+    protected XMLGregorianCalendar validateInitialTerminationTime(String value) throws UnacceptableInitialTerminationTimeFault {
+    	XMLGregorianCalendar tt = parseTerminationTime(value);
+    	if (tt == null) {
+    		UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
+        	throw new UnacceptableInitialTerminationTimeFault(
+        			"Unable to parse initial termination time: '" + value + "'",
+        			fault);
+    	}
+    	XMLGregorianCalendar ct = getCurrentTime();
+    	int c = tt.compare(ct);
+    	if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) {
+    		UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
+        	fault.setMinimumTime(ct);
+        	throw new UnacceptableInitialTerminationTimeFault(
+        			"Invalid initial termination time",
+        			fault);
+    	}
+    	return tt;
+    }
+    
+    protected XMLGregorianCalendar validateTerminationTime(String value) throws UnacceptableTerminationTimeFault {
+    	XMLGregorianCalendar tt = parseTerminationTime(value);
+    	if (tt == null) {
+        	UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
+        	throw new UnacceptableTerminationTimeFault(
+        			"Unable to parse termination time: '" + value + "'",
+        			fault);
+    	}
+    	XMLGregorianCalendar ct = getCurrentTime();
+    	int c = tt.compare(ct);
+    	if (c == DatatypeConstants.LESSER || c == DatatypeConstants.EQUAL) {
+        	UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
+        	fault.setMinimumTime(ct);
+        	throw new UnacceptableTerminationTimeFault(
+        			"Invalid termination time",
+        			fault);
+    	}
+    	return tt;
+    }
+    
+    protected XMLGregorianCalendar parseTerminationTime(String value) {
+    	try {
+    		Duration d = datatypeFactory.newDuration(value);
+    		XMLGregorianCalendar c = getCurrentTime();
+    		c.add(d);
+    		return c;
+    	} catch (Exception e) { }
+    	try {
+    		Duration d = datatypeFactory.newDurationDayTime(value);
+    		XMLGregorianCalendar c = getCurrentTime();
+    		c.add(d);
+    		return c;
+    	} catch (Exception e) { }
+    	try {
+    		Duration d = datatypeFactory.newDurationYearMonth(value);
+    		XMLGregorianCalendar c = getCurrentTime();
+    		c.add(d);
+    		return c;
+    	} catch (Exception e) { }
+    	try {
+    		return datatypeFactory.newXMLGregorianCalendar(value);
+    	} catch (Exception e) { }
+    	return null;
+    }
+    
+    protected XMLGregorianCalendar getCurrentTime() {
+    	return datatypeFactory.newXMLGregorianCalendar(new GregorianCalendar());
+    }
+
+	public XMLGregorianCalendar getTerminationTime() {
+		return terminationTime;
+	}
+
+	public void setTerminationTime(XMLGregorianCalendar terminationTime) {
+		this.terminationTime = terminationTime;
+	}
+	
+	public void create(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
+		validateSubscription(subscribeRequest);
+		start();
+	}
+	
+	protected abstract void start() throws SubscribeCreationFailedFault;
+	
+	protected abstract void pause() throws PauseFailedFault;
+	
+    protected abstract void resume() throws ResumeFailedFault;
+
+    protected abstract void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault;
+
+    protected void unsubscribe() throws UnableToDestroySubscriptionFault {
+    	try {
+    		unregister();
+    	} catch (EndpointRegistrationException e) {
+    		UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
+    		throw new UnableToDestroySubscriptionFault("Error unregistering endpoint", fault, e);
+    	}
+    }
+
+	protected String createAddress() {
+		return "http://servicemix.org/wsnotification/Subscription/" + getName();
+	}
+
+	protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
+		// Check consumer reference
+		consumerReference = subscribeRequest.getConsumerReference();
+		// Check terminationTime
+		if (subscribeRequest.getInitialTerminationTime() != null &&
+			subscribeRequest.getInitialTerminationTime().isNil() == false &&
+			subscribeRequest.getInitialTerminationTime().getValue() != null) {
+			String strTerminationTime = subscribeRequest.getInitialTerminationTime().getValue();
+			terminationTime = validateInitialTerminationTime(strTerminationTime.trim());
+		}
+		// Check filter
+		if (subscribeRequest.getFilter() != null) {
+			for (Object f : subscribeRequest.getFilter().getAny()) {
+				JAXBElement e = null;
+				if (f instanceof JAXBElement) {
+					e = (JAXBElement) f;
+					f = e.getValue();
+				}
+				if (f instanceof TopicExpressionType) {
+					if (!e.getName().equals(QNAME_TOPIC_EXPRESSION)) {
+						InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+						throw new InvalidTopicExpressionFault("Unrecognized TopicExpression: " + e, fault);
+					}
+					topic = (TopicExpressionType) f;
+				} else if (f instanceof QueryExpressionType) {
+					if (e != null && e.getName().equals(QNAME_PRODUCER_PROPERTIES)) {
+						InvalidProducerPropertiesExpressionFaultType fault = new InvalidProducerPropertiesExpressionFaultType();
+						throw new InvalidProducerPropertiesExpressionFault("ProducerProperties are not supported", fault);
+					} else if (e != null && e.getName().equals(QNAME_MESSAGE_CONTENT)) {
+						if (contentFilter != null) {
+							InvalidMessageContentExpressionFaultType fault = new InvalidMessageContentExpressionFaultType();
+							throw new InvalidMessageContentExpressionFault("Only one MessageContent filter can be specified", fault);
+						}
+						contentFilter = (QueryExpressionType) f;
+						// Defaults to XPath 1.0
+						if (contentFilter.getDialect() == null) {
+							contentFilter.setDialect(XPATH1_URI);
+						}
+					} else {
+						InvalidFilterFaultType fault = new InvalidFilterFaultType();
+						throw new InvalidFilterFault("Unrecognized filter: " + (e != null ? e.getName() : f), fault);
+					}
+				} else {
+					InvalidFilterFaultType fault = new InvalidFilterFaultType();
+					throw new InvalidFilterFault("Unrecognized filter: " + (e != null ? e.getName() : f), fault);
+				}
+			}
+		}
+		// Check policy
+		if (subscribeRequest.getSubscriptionPolicy() != null) {
+			for (Object p : subscribeRequest.getSubscriptionPolicy().getAny()) {
+				JAXBElement e = null;
+				if (p instanceof JAXBElement) {
+					e = (JAXBElement) p;
+					p = e.getValue();
+				}
+				if (p instanceof UseRaw) {
+					useRaw = true;
+				} else {
+					InvalidFilterFaultType fault = new InvalidFilterFaultType();
+					throw new InvalidFilterFault("Unrecognized policy: " + p, fault);
+				}
+			}
+		}
+		// Check all parameters
+		if (consumerReference == null) {
+			SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
+			throw new SubscribeCreationFailedFault("Invalid ConsumerReference: null", fault);
+		}
+		// TODO check we can resolve endpoint
+		if (topic == null) {
+			InvalidFilterFaultType fault = new InvalidFilterFaultType();
+			throw new InvalidFilterFault("Must specify a topic to subscribe on", fault);
+		}
+		if (contentFilter != null && !contentFilter.getDialect().equals(XPATH1_URI)) {
+			InvalidMessageContentExpressionFaultType fault = new InvalidMessageContentExpressionFaultType();
+			throw new InvalidMessageContentExpressionFault("Unsupported MessageContent dialect: '" + contentFilter.getDialect() + "'", fault);
+		}
+		if (terminationTime != null) {
+			UnacceptableInitialTerminationTimeFaultType fault = new UnacceptableInitialTerminationTimeFaultType();
+	    	throw new UnacceptableInitialTerminationTimeFault(
+	    			"InitialTerminationTime is not supported",
+	    			fault);
+		}
+	}
+
+    public AbstractNotificationBroker getBroker() {
+        return broker;
+    }
+
+    public void setBroker(AbstractNotificationBroker broker) {
+        this.broker = broker;
+    }
+}

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/AbstractSubscription.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/EndpointManager.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/EndpointManager.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/EndpointManager.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/EndpointManager.java Tue Feb 21 15:40:05 2006
@@ -1,25 +1,25 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn;
-
-public interface EndpointManager {
-	
-    Object register(String address, 
-                    Object service) throws EndpointRegistrationException;
-	
-	void unregister(Object endpoint) throws EndpointRegistrationException;
-
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn;
+
+public interface EndpointManager {
+	
+    Object register(String address, 
+                    Object service) throws EndpointRegistrationException;
+	
+	void unregister(Object endpoint) throws EndpointRegistrationException;
+
+}

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/EndpointManager.java
------------------------------------------------------------------------------
    svn:eol-style = native