You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ch...@apache.org on 2006/02/22 00:40:29 UTC

svn commit: r379627 [25/34] - in /incubator/servicemix/trunk: ./ etc/ sandbox/servicemix-wsn-1.2/src/sa/META-INF/ sandbox/servicemix-wsn-1.2/src/su/META-INF/ servicemix-assembly/ servicemix-assembly/src/main/assembly/ servicemix-assembly/src/main/relea...

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsPullPoint.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsPullPoint.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsPullPoint.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsPullPoint.java Tue Feb 21 15:40:05 2006
@@ -1,145 +1,145 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn.jms;
-
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.wsn.AbstractPullPoint;
-import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
-import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
-import org.oasis_open.docs.wsn.b_2.Notify;
-import org.oasis_open.docs.wsrf.r_2.ResourceUnknownFaultType;
-
-public class JmsPullPoint extends AbstractPullPoint {
-
-	private static Log log = LogFactory.getLog(JmsPullPoint.class);
-	
-	private JAXBContext jaxbContext;
-	private Connection connection;
-	private Session session;
-	private Queue queue;
-	private MessageProducer producer;
-	private MessageConsumer consumer;
-
-	public JmsPullPoint(String name)  {
-		super(name);
-		try {
-			jaxbContext = JAXBContext.newInstance(Notify.class);
-		} catch (JAXBException e) {
-			throw new RuntimeException("Could not create PullEndpoint", e);
-		}
-	}
-	
-	protected void initSession() throws JMSException {
-		if (session == null) {
-			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-			queue = session.createQueue(getName());
-			producer = session.createProducer(queue);
-			consumer = session.createConsumer(queue);
-		}
-	}
-
-	@Override
-	protected synchronized void store(NotificationMessageHolderType messageHolder) {
-		try {
-			initSession();
-            Notify notify = new Notify();
-            notify.getNotificationMessage().add(messageHolder);
-            StringWriter writer = new StringWriter();
-            jaxbContext.createMarshaller().marshal(notify, writer);
-            Message message = session.createTextMessage(writer.toString());
-            producer.send(message);
-		} catch (JMSException e) {
-			log.warn("Error storing message", e);
-			if (session != null) {
-				try {
-					session.close();
-				} catch (JMSException inner) {
-					log.debug("Error closing session", inner);
-				} finally {
-					session = null;
-				}
-			}
-		} catch (JAXBException e) {
-			log.warn("Error storing message", e);
-		}
-	}
-
-	@Override
-	protected synchronized List<NotificationMessageHolderType> getMessages(int max) throws ResourceUnknownFault {
-		Session session = null;
-		try {
-			if (max == 0) {
-				max = 256;
-			}
-			initSession();
-			List<NotificationMessageHolderType> messages = new ArrayList<NotificationMessageHolderType>();
-			for (int i = 0; i < max; i++) {
-				Message msg = consumer.receiveNoWait();
-				if (msg == null) {
-					break;
-				}
-				TextMessage txtMsg = (TextMessage) msg;
-				StringReader reader = new StringReader(txtMsg.getText());
-				Notify notify = (Notify) jaxbContext.createUnmarshaller().unmarshal(reader);
-				messages.addAll(notify.getNotificationMessage());
-			}
-			return messages;
-		} catch (JMSException e) {
-			log.info("Error retrieving messages", e);
-			if (session != null) {
-				try {
-					session.close();
-				} catch (JMSException inner) {
-					log.debug("Error closing session", inner);
-				} finally {
-					session = null;
-				}
-			}
-			ResourceUnknownFaultType fault = new ResourceUnknownFaultType();
-			throw new ResourceUnknownFault("Unable to retrieve messages", fault, e);
-		} catch (JAXBException e) {
-			log.info("Error retrieving messages", e);
-			ResourceUnknownFaultType fault = new ResourceUnknownFaultType();
-			throw new ResourceUnknownFault("Unable to retrieve messages", fault, e);
-		}
-	}
-	
-	public Connection getConnection() {
-		return connection;
-	}
-
-	public void setConnection(Connection connection) {
-		this.connection = connection;
-	}
-
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn.jms;
+
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.wsn.AbstractPullPoint;
+import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsrf.r_2.ResourceUnknownFaultType;
+
+public class JmsPullPoint extends AbstractPullPoint {
+
+	private static Log log = LogFactory.getLog(JmsPullPoint.class);
+	
+	private JAXBContext jaxbContext;
+	private Connection connection;
+	private Session session;
+	private Queue queue;
+	private MessageProducer producer;
+	private MessageConsumer consumer;
+
+	public JmsPullPoint(String name)  {
+		super(name);
+		try {
+			jaxbContext = JAXBContext.newInstance(Notify.class);
+		} catch (JAXBException e) {
+			throw new RuntimeException("Could not create PullEndpoint", e);
+		}
+	}
+	
+	protected void initSession() throws JMSException {
+		if (session == null) {
+			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			queue = session.createQueue(getName());
+			producer = session.createProducer(queue);
+			consumer = session.createConsumer(queue);
+		}
+	}
+
+	@Override
+	protected synchronized void store(NotificationMessageHolderType messageHolder) {
+		try {
+			initSession();
+            Notify notify = new Notify();
+            notify.getNotificationMessage().add(messageHolder);
+            StringWriter writer = new StringWriter();
+            jaxbContext.createMarshaller().marshal(notify, writer);
+            Message message = session.createTextMessage(writer.toString());
+            producer.send(message);
+		} catch (JMSException e) {
+			log.warn("Error storing message", e);
+			if (session != null) {
+				try {
+					session.close();
+				} catch (JMSException inner) {
+					log.debug("Error closing session", inner);
+				} finally {
+					session = null;
+				}
+			}
+		} catch (JAXBException e) {
+			log.warn("Error storing message", e);
+		}
+	}
+
+	@Override
+	protected synchronized List<NotificationMessageHolderType> getMessages(int max) throws ResourceUnknownFault {
+		Session session = null;
+		try {
+			if (max == 0) {
+				max = 256;
+			}
+			initSession();
+			List<NotificationMessageHolderType> messages = new ArrayList<NotificationMessageHolderType>();
+			for (int i = 0; i < max; i++) {
+				Message msg = consumer.receiveNoWait();
+				if (msg == null) {
+					break;
+				}
+				TextMessage txtMsg = (TextMessage) msg;
+				StringReader reader = new StringReader(txtMsg.getText());
+				Notify notify = (Notify) jaxbContext.createUnmarshaller().unmarshal(reader);
+				messages.addAll(notify.getNotificationMessage());
+			}
+			return messages;
+		} catch (JMSException e) {
+			log.info("Error retrieving messages", e);
+			if (session != null) {
+				try {
+					session.close();
+				} catch (JMSException inner) {
+					log.debug("Error closing session", inner);
+				} finally {
+					session = null;
+				}
+			}
+			ResourceUnknownFaultType fault = new ResourceUnknownFaultType();
+			throw new ResourceUnknownFault("Unable to retrieve messages", fault, e);
+		} catch (JAXBException e) {
+			log.info("Error retrieving messages", e);
+			ResourceUnknownFaultType fault = new ResourceUnknownFaultType();
+			throw new ResourceUnknownFault("Unable to retrieve messages", fault, e);
+		}
+	}
+	
+	public Connection getConnection() {
+		return connection;
+	}
+
+	public void setConnection(Connection connection) {
+		this.connection = connection;
+	}
+
+}

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsPullPoint.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsSubscription.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsSubscription.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsSubscription.java Tue Feb 21 15:40:05 2006
@@ -1,213 +1,213 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn.jms;
-
-import java.io.StringReader;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.xml.datatype.XMLGregorianCalendar;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpression;
-import javax.xml.xpath.XPathExpressionException;
-import javax.xml.xpath.XPathFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.wsn.AbstractSubscription;
-import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
-import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
-import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
-import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
-import org.apache.servicemix.wsn.jaxws.PauseFailedFault;
-import org.apache.servicemix.wsn.jaxws.ResumeFailedFault;
-import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
-import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
-import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
-import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault;
-import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
-import org.apache.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault;
-import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
-import org.oasis_open.docs.wsn.b_2.PauseFailedFaultType;
-import org.oasis_open.docs.wsn.b_2.ResumeFailedFaultType;
-import org.oasis_open.docs.wsn.b_2.Subscribe;
-import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
-import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType;
-import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.xml.sax.InputSource;
-
-public abstract class JmsSubscription extends AbstractSubscription implements MessageListener {
-
-	private static Log log = LogFactory.getLog(JmsSubscription.class);
-	
-	private Connection connection;
-	private Session session;
-    private JmsTopicExpressionConverter topicConverter;
-    private Topic jmsTopic;
-	
-	public JmsSubscription(String name) {
-		super(name);
-		topicConverter = new JmsTopicExpressionConverter();
-	}
-
-	protected void start() throws SubscribeCreationFailedFault {
-		try {
-			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-			MessageConsumer consumer = session.createConsumer(jmsTopic);
-            consumer.setMessageListener(this);
-		} catch (JMSException e) {
-			SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
-			throw new SubscribeCreationFailedFault("Error starting subscription", fault, e);
-		}
-	}
-	
-	@Override
-	protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
-		super.validateSubscription(subscribeRequest);
-		try {
-			jmsTopic = topicConverter.toActiveMQTopic(topic);
-		} catch (InvalidTopicException e) {
-			InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
-			throw new InvalidTopicExpressionFault(e.getMessage(), fault);
-		}
-	}
-	
-	@Override
-	protected void pause() throws PauseFailedFault {
-		if (session == null) {
-			PauseFailedFaultType fault = new PauseFailedFaultType();
-			throw new PauseFailedFault("Subscription is already paused", fault);
-		} else {
-			try {
-				session.close();
-			} catch (JMSException e) {
-				PauseFailedFaultType fault = new PauseFailedFaultType();
-				throw new PauseFailedFault("Error pausing subscription", fault, e);
-			} finally {
-				session = null;
-			}
-		}
-	}
-
-	@Override
-	protected void resume() throws ResumeFailedFault {
-		if (session != null) {
-			ResumeFailedFaultType fault = new ResumeFailedFaultType();
-			throw new ResumeFailedFault("Subscription is already running", fault);
-		} else {
-			try {
-				session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-				MessageConsumer consumer = session.createConsumer(jmsTopic);
-	            consumer.setMessageListener(this);
-			} catch (JMSException e) {
-				ResumeFailedFaultType fault = new ResumeFailedFaultType();
-				throw new ResumeFailedFault("Error resuming subscription", fault, e);
-			}
-		}
-	}
-
-	@Override
-	protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault {
-		UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
-    	throw new UnacceptableTerminationTimeFault(
-    			"TerminationTime is not supported",
-    			fault);
-	}
-
-	@Override
-	protected void unsubscribe() throws UnableToDestroySubscriptionFault {
-		super.unsubscribe();
-		if (session != null) {
-			try {
-				session.close();
-			} catch (JMSException e) {
-				UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
-				throw new UnableToDestroySubscriptionFault("Unable to unsubscribe", fault, e);
-			} finally {
-				session = null;
-			}
-		}
-	}
-
-	public Connection getConnection() {
-		return connection;
-	}
-
-	public void setConnection(Connection connection) {
-		this.connection = connection;
-	}
-
-	public void onMessage(Message jmsMessage) {
-		try {
-			TextMessage text = (TextMessage) jmsMessage;
-			DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-			factory.setNamespaceAware(true);
-			Document doc = factory.newDocumentBuilder().parse(new InputSource(new StringReader(text.getText())));
-			Element root = doc.getDocumentElement();
-			Element holder = (Element) root.getElementsByTagNameNS(WSN_URI, "NotificationMessage").item(0);
-			Element message = (Element) holder.getElementsByTagNameNS(WSN_URI, "Message").item(0);
-			Element content = null;
-			for (int i = 0; i < message.getChildNodes().getLength(); i++) {
-				if (message.getChildNodes().item(i) instanceof Element) {
-					content = (Element) message.getChildNodes().item(i);
-					break;
-				}
-			}
-	        boolean match = doFilter(content);
-			if (match) {
-				if (useRaw) {
-					doNotify(content);
-				} else {
-					doNotify(root);
-				}
-			}
-		} catch (Exception e) {
-			log.warn("Error notifying consumer", e);
-		}
-	}
-	
-	protected boolean doFilter(Element content) {
-		if (contentFilter != null) {
-			if (!contentFilter.getDialect().equals(XPATH1_URI)) {
-				throw new IllegalStateException("Unsupported dialect: " + contentFilter.getDialect());
-			}
-			try {
-				XPathFactory xpfactory = XPathFactory.newInstance();
-				XPath xpath = xpfactory.newXPath();
-				XPathExpression exp = xpath.compile(contentFilter.getContent().get(0).toString());
-				Boolean ret = (Boolean) exp.evaluate(content, XPathConstants.BOOLEAN);
-				return ret.booleanValue();
-			} catch (XPathExpressionException e) {
-				log.warn("Could not filter notification", e);
-			}
-			return false;
-		}
-		return true;
-	}
-	
-	protected abstract void doNotify(Element content);
-
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn.jms;
+
+import java.io.StringReader;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.xml.datatype.XMLGregorianCalendar;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.wsn.AbstractSubscription;
+import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
+import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
+import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
+import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
+import org.apache.servicemix.wsn.jaxws.PauseFailedFault;
+import org.apache.servicemix.wsn.jaxws.ResumeFailedFault;
+import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
+import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
+import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
+import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault;
+import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
+import org.apache.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault;
+import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
+import org.oasis_open.docs.wsn.b_2.PauseFailedFaultType;
+import org.oasis_open.docs.wsn.b_2.ResumeFailedFaultType;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
+import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType;
+import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.xml.sax.InputSource;
+
+public abstract class JmsSubscription extends AbstractSubscription implements MessageListener {
+
+	private static Log log = LogFactory.getLog(JmsSubscription.class);
+	
+	private Connection connection;
+	private Session session;
+    private JmsTopicExpressionConverter topicConverter;
+    private Topic jmsTopic;
+	
+	public JmsSubscription(String name) {
+		super(name);
+		topicConverter = new JmsTopicExpressionConverter();
+	}
+
+	protected void start() throws SubscribeCreationFailedFault {
+		try {
+			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			MessageConsumer consumer = session.createConsumer(jmsTopic);
+            consumer.setMessageListener(this);
+		} catch (JMSException e) {
+			SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
+			throw new SubscribeCreationFailedFault("Error starting subscription", fault, e);
+		}
+	}
+	
+	@Override
+	protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
+		super.validateSubscription(subscribeRequest);
+		try {
+			jmsTopic = topicConverter.toActiveMQTopic(topic);
+		} catch (InvalidTopicException e) {
+			InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+			throw new InvalidTopicExpressionFault(e.getMessage(), fault);
+		}
+	}
+	
+	@Override
+	protected void pause() throws PauseFailedFault {
+		if (session == null) {
+			PauseFailedFaultType fault = new PauseFailedFaultType();
+			throw new PauseFailedFault("Subscription is already paused", fault);
+		} else {
+			try {
+				session.close();
+			} catch (JMSException e) {
+				PauseFailedFaultType fault = new PauseFailedFaultType();
+				throw new PauseFailedFault("Error pausing subscription", fault, e);
+			} finally {
+				session = null;
+			}
+		}
+	}
+
+	@Override
+	protected void resume() throws ResumeFailedFault {
+		if (session != null) {
+			ResumeFailedFaultType fault = new ResumeFailedFaultType();
+			throw new ResumeFailedFault("Subscription is already running", fault);
+		} else {
+			try {
+				session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+				MessageConsumer consumer = session.createConsumer(jmsTopic);
+	            consumer.setMessageListener(this);
+			} catch (JMSException e) {
+				ResumeFailedFaultType fault = new ResumeFailedFaultType();
+				throw new ResumeFailedFault("Error resuming subscription", fault, e);
+			}
+		}
+	}
+
+	@Override
+	protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault {
+		UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
+    	throw new UnacceptableTerminationTimeFault(
+    			"TerminationTime is not supported",
+    			fault);
+	}
+
+	@Override
+	protected void unsubscribe() throws UnableToDestroySubscriptionFault {
+		super.unsubscribe();
+		if (session != null) {
+			try {
+				session.close();
+			} catch (JMSException e) {
+				UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
+				throw new UnableToDestroySubscriptionFault("Unable to unsubscribe", fault, e);
+			} finally {
+				session = null;
+			}
+		}
+	}
+
+	public Connection getConnection() {
+		return connection;
+	}
+
+	public void setConnection(Connection connection) {
+		this.connection = connection;
+	}
+
+	public void onMessage(Message jmsMessage) {
+		try {
+			TextMessage text = (TextMessage) jmsMessage;
+			DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+			factory.setNamespaceAware(true);
+			Document doc = factory.newDocumentBuilder().parse(new InputSource(new StringReader(text.getText())));
+			Element root = doc.getDocumentElement();
+			Element holder = (Element) root.getElementsByTagNameNS(WSN_URI, "NotificationMessage").item(0);
+			Element message = (Element) holder.getElementsByTagNameNS(WSN_URI, "Message").item(0);
+			Element content = null;
+			for (int i = 0; i < message.getChildNodes().getLength(); i++) {
+				if (message.getChildNodes().item(i) instanceof Element) {
+					content = (Element) message.getChildNodes().item(i);
+					break;
+				}
+			}
+	        boolean match = doFilter(content);
+			if (match) {
+				if (useRaw) {
+					doNotify(content);
+				} else {
+					doNotify(root);
+				}
+			}
+		} catch (Exception e) {
+			log.warn("Error notifying consumer", e);
+		}
+	}
+	
+	protected boolean doFilter(Element content) {
+		if (contentFilter != null) {
+			if (!contentFilter.getDialect().equals(XPATH1_URI)) {
+				throw new IllegalStateException("Unsupported dialect: " + contentFilter.getDialect());
+			}
+			try {
+				XPathFactory xpfactory = XPathFactory.newInstance();
+				XPath xpath = xpfactory.newXPath();
+				XPathExpression exp = xpath.compile(contentFilter.getContent().get(0).toString());
+				Boolean ret = (Boolean) exp.evaluate(content, XPathConstants.BOOLEAN);
+				return ret.booleanValue();
+			} catch (XPathExpressionException e) {
+				log.warn("Could not filter notification", e);
+			}
+			return false;
+		}
+		return true;
+	}
+	
+	protected abstract void doNotify(Element content);
+
+}

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsSubscription.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsTopicExpressionConverter.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsTopicExpressionConverter.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsTopicExpressionConverter.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsTopicExpressionConverter.java Tue Feb 21 15:40:05 2006
@@ -1,93 +1,93 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn.jms;
-
-import java.util.Iterator;
-import java.util.List;
-
-import javax.jms.Topic;
-import javax.xml.namespace.QName;
-
-import org.apache.activemq.command.ActiveMQTopic;
-import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
-
-public class JmsTopicExpressionConverter {
-
-    public static final String SIMPLE_DIALECT = "http://docs.oasis-open.org/wsn/t-1/TopicExpression/Simple";
-
-    public TopicExpressionType toTopicExpression(Topic topic) {
-        return toTopicExpression(topic.toString());
-    }
-
-    public TopicExpressionType toTopicExpression(ActiveMQTopic topic) {
-        return toTopicExpression(topic.getPhysicalName());
-    }
-
-    public TopicExpressionType toTopicExpression(String name) {
-        TopicExpressionType answer = new TopicExpressionType();
-        answer.getContent().add(QName.valueOf(name));
-        answer.setDialect(SIMPLE_DIALECT);
-        return answer;
-    }
-
-    public ActiveMQTopic toActiveMQTopic(List<TopicExpressionType> topics) throws InvalidTopicException {
-        if (topics == null || topics.size() == 0) {
-            return null;
-        }
-        int size = topics.size();
-        ActiveMQTopic childrenDestinations[] = new ActiveMQTopic[size];
-        for (int i = 0; i < size; i++) {
-            childrenDestinations[i] = toActiveMQTopic(topics.get(i));
-        }
-
-        ActiveMQTopic topic = new ActiveMQTopic();
-        topic.setCompositeDestinations(childrenDestinations);
-        return topic;
-    }
-
-    public ActiveMQTopic toActiveMQTopic(TopicExpressionType topic) throws InvalidTopicException {
-        String dialect = topic.getDialect();
-        if (dialect == null || SIMPLE_DIALECT.equals(dialect)) {
-            for (Iterator iter = topic.getContent().iterator(); iter.hasNext();) {
-                ActiveMQTopic answer = createActiveMQTopicFromContent(iter.next());
-                if (answer != null) {
-                    return answer;
-                }
-            }
-            throw new InvalidTopicException("No topic name available topic: " + topic);
-        }
-        else {
-        	throw new InvalidTopicException("Topic dialect: " + dialect + " not supported");
-        }
-    }
-
-    // Implementation methods
-    // -------------------------------------------------------------------------
-    protected ActiveMQTopic createActiveMQTopicFromContent(Object contentItem) {
-        if (contentItem instanceof String) {
-            return new ActiveMQTopic(((String) contentItem).trim());
-        }
-        if (contentItem instanceof QName) {
-            return createActiveMQTopicFromQName((QName) contentItem);
-        }
-        return null;
-    }
-
-    protected ActiveMQTopic createActiveMQTopicFromQName(QName qName) {
-        return new ActiveMQTopic(qName.toString());
-    }
-
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn.jms;
+
+import java.util.Iterator;
+import java.util.List;
+
+import javax.jms.Topic;
+import javax.xml.namespace.QName;
+
+import org.apache.activemq.command.ActiveMQTopic;
+import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
+
+public class JmsTopicExpressionConverter {
+
+    public static final String SIMPLE_DIALECT = "http://docs.oasis-open.org/wsn/t-1/TopicExpression/Simple";
+
+    public TopicExpressionType toTopicExpression(Topic topic) {
+        return toTopicExpression(topic.toString());
+    }
+
+    public TopicExpressionType toTopicExpression(ActiveMQTopic topic) {
+        return toTopicExpression(topic.getPhysicalName());
+    }
+
+    public TopicExpressionType toTopicExpression(String name) {
+        TopicExpressionType answer = new TopicExpressionType();
+        answer.getContent().add(QName.valueOf(name));
+        answer.setDialect(SIMPLE_DIALECT);
+        return answer;
+    }
+
+    public ActiveMQTopic toActiveMQTopic(List<TopicExpressionType> topics) throws InvalidTopicException {
+        if (topics == null || topics.size() == 0) {
+            return null;
+        }
+        int size = topics.size();
+        ActiveMQTopic childrenDestinations[] = new ActiveMQTopic[size];
+        for (int i = 0; i < size; i++) {
+            childrenDestinations[i] = toActiveMQTopic(topics.get(i));
+        }
+
+        ActiveMQTopic topic = new ActiveMQTopic();
+        topic.setCompositeDestinations(childrenDestinations);
+        return topic;
+    }
+
+    public ActiveMQTopic toActiveMQTopic(TopicExpressionType topic) throws InvalidTopicException {
+        String dialect = topic.getDialect();
+        if (dialect == null || SIMPLE_DIALECT.equals(dialect)) {
+            for (Iterator iter = topic.getContent().iterator(); iter.hasNext();) {
+                ActiveMQTopic answer = createActiveMQTopicFromContent(iter.next());
+                if (answer != null) {
+                    return answer;
+                }
+            }
+            throw new InvalidTopicException("No topic name available topic: " + topic);
+        }
+        else {
+        	throw new InvalidTopicException("Topic dialect: " + dialect + " not supported");
+        }
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+    protected ActiveMQTopic createActiveMQTopicFromContent(Object contentItem) {
+        if (contentItem instanceof String) {
+            return new ActiveMQTopic(((String) contentItem).trim());
+        }
+        if (contentItem instanceof QName) {
+            return createActiveMQTopicFromQName((QName) contentItem);
+        }
+        return null;
+    }
+
+    protected ActiveMQTopic createActiveMQTopicFromQName(QName qName) {
+        return new ActiveMQTopic(qName.toString());
+    }
+
+}

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsTopicExpressionConverter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/package.html
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/package.html?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/package.html (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/package.html Tue Feb 21 15:40:05 2006
@@ -1,9 +1,9 @@
-<html>
-<head>
-</head>
-<body>
-
-JMS based implementation of the NotificationBroker and related services.
-
-</body>
-</html>
+<html>
+<head>
+</head>
+<body>
+
+JMS based implementation of the NotificationBroker and related services.
+
+</body>
+</html>

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/package.html
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/package.html?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/package.html (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/package.html Tue Feb 21 15:40:05 2006
@@ -1,9 +1,9 @@
-<html>
-<head>
-</head>
-<body>
-
-This package provides base classes for WS-Notification implementations.
-
-</body>
-</html>
+<html>
+<head>
+</head>
+<body>
+
+This package provides base classes for WS-Notification implementations.
+
+</body>
+</html>

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/package.html
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/resources/META-INF/DISCLAIMER.txt
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/resources/META-INF/DISCLAIMER.txt?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/resources/META-INF/DISCLAIMER.txt (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/resources/META-INF/DISCLAIMER.txt Tue Feb 21 15:40:05 2006
@@ -1,7 +1,7 @@
-ActiveMQ is an effort undergoing incubation at the Apache Software Foundation
-(ASF), sponsored by the Geronimo PMC. Incubation is required of all newly
-accepted projects until a further review indicates that the infrastructure,
-communications, and decision making process have stabilized in a manner
-consistent with other successful ASF projects. While incubation status is not
-necessarily a reflection of the completeness or stability of the code, it does
+ActiveMQ is an effort undergoing incubation at the Apache Software Foundation
+(ASF), sponsored by the Geronimo PMC. Incubation is required of all newly
+accepted projects until a further review indicates that the infrastructure,
+communications, and decision making process have stabilized in a manner
+consistent with other successful ASF projects. While incubation status is not
+necessarily a reflection of the completeness or stability of the code, it does
 indicate that the project has yet to be fully endorsed by the ASF.

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/resources/META-INF/DISCLAIMER.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/resources/META-INF/LICENSE.txt
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/wsdl/catalog.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/SubscriptionTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/SubscriptionTest.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/SubscriptionTest.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/SubscriptionTest.java Tue Feb 21 15:40:05 2006
@@ -1,140 +1,140 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.datatype.XMLGregorianCalendar;
-
-import junit.framework.TestCase;
-
-import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
-import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
-import org.apache.servicemix.wsn.jaxws.PauseFailedFault;
-import org.apache.servicemix.wsn.jaxws.ResumeFailedFault;
-import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
-import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
-import org.apache.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault;
-import org.oasis_open.docs.wsn.b_2.Subscribe;
-
-public class SubscriptionTest extends TestCase {
-
-	private JAXBContext context;
-	private Unmarshaller unmarshaller;
-	private AbstractSubscription subscription;
-	
-	protected void setUp() throws Exception {
-		context = JAXBContext.newInstance(Subscribe.class);
-		unmarshaller = context.createUnmarshaller();
-		subscription = new DummySubscription("mySubscription");
-	}
-	
-	protected Subscribe getSubscription(String file) throws JAXBException, IOException {
-		InputStream is = getClass().getResourceAsStream(file);
-		Subscribe subscribe = (Subscribe) unmarshaller.unmarshal(is);
-		is.close();
-		return subscribe;
-	}
-	
-	public void testWithNilITT() throws Exception {
-		Subscribe subscribe = getSubscription("subscribe-nil-itt.xml");
-		subscription.validateSubscription(subscribe);
-	}
-	
-	public void testWithAbsoluteITT() throws Exception {
-		Subscribe subscribe = getSubscription("subscribe-abs-itt.xml");
-		try {
-			subscription.validateSubscription(subscribe);
-			fail("Invalid initial termination time used. Fault was expected.");
-		} catch (UnacceptableInitialTerminationTimeFault e) {
-			// OK
-		}
-	}
-	
-	public void testWithEmptyITT() throws Exception {
-		Subscribe subscribe = getSubscription("subscribe-empty-itt.xml");
-		try {
-			subscription.validateSubscription(subscribe);
-			fail("Invalid initial termination time used. Fault was expected.");
-		} catch (UnacceptableInitialTerminationTimeFault e) {
-			// OK
-		}
-	}
-	
-	public void testWithNoITT() throws Exception {
-		Subscribe subscribe = getSubscription("subscribe-no-itt.xml");
-		subscription.validateSubscription(subscribe);
-	}
-	
-	public void testWithUseRaw() throws Exception {
-		Subscribe subscribe = getSubscription("subscribe-raw.xml");
-		subscription.validateSubscription(subscribe);
-	}
-	
-	public void testWithProducerProperties() throws Exception {
-		Subscribe subscribe = getSubscription("subscribe-pp.xml");
-		try {
-			subscription.validateSubscription(subscribe);
-			fail("ProducerProperties used. Fault was expected.");
-		} catch (InvalidProducerPropertiesExpressionFault e) {
-			// OK
-		}
-	}
-	
-	public void testWithNoTopic() throws Exception {
-		Subscribe subscribe = getSubscription("subscribe-no-topic.xml");
-		try {
-			subscription.validateSubscription(subscribe);
-			fail("ProducerProperties used. Fault was expected.");
-		} catch (InvalidFilterFault e) {
-			// OK
-		}
-	}
-	
-	public void testWithEPR() throws Exception {
-		Subscribe subscribe = getSubscription("subscribe-epr.xml");
-		subscription.validateSubscription(subscribe);
-	}
-	
-	public static class DummySubscription extends AbstractSubscription {
-
-		public DummySubscription(String name) {
-			super(name);
-		}
-
-		@Override
-		protected void start() throws SubscribeCreationFailedFault {
-		}
-
-		@Override
-		protected void pause() throws PauseFailedFault {
-		}
-
-		@Override
-		protected void resume() throws ResumeFailedFault {
-		}
-
-		@Override
-		protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault {
-		}
-		
-	}
-	
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.datatype.XMLGregorianCalendar;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
+import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
+import org.apache.servicemix.wsn.jaxws.PauseFailedFault;
+import org.apache.servicemix.wsn.jaxws.ResumeFailedFault;
+import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
+import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
+import org.apache.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+
+public class SubscriptionTest extends TestCase {
+
+	private JAXBContext context;
+	private Unmarshaller unmarshaller;
+	private AbstractSubscription subscription;
+	
+	protected void setUp() throws Exception {
+		context = JAXBContext.newInstance(Subscribe.class);
+		unmarshaller = context.createUnmarshaller();
+		subscription = new DummySubscription("mySubscription");
+	}
+	
+	protected Subscribe getSubscription(String file) throws JAXBException, IOException {
+		InputStream is = getClass().getResourceAsStream(file);
+		Subscribe subscribe = (Subscribe) unmarshaller.unmarshal(is);
+		is.close();
+		return subscribe;
+	}
+	
+	public void testWithNilITT() throws Exception {
+		Subscribe subscribe = getSubscription("subscribe-nil-itt.xml");
+		subscription.validateSubscription(subscribe);
+	}
+	
+	public void testWithAbsoluteITT() throws Exception {
+		Subscribe subscribe = getSubscription("subscribe-abs-itt.xml");
+		try {
+			subscription.validateSubscription(subscribe);
+			fail("Invalid initial termination time used. Fault was expected.");
+		} catch (UnacceptableInitialTerminationTimeFault e) {
+			// OK
+		}
+	}
+	
+	public void testWithEmptyITT() throws Exception {
+		Subscribe subscribe = getSubscription("subscribe-empty-itt.xml");
+		try {
+			subscription.validateSubscription(subscribe);
+			fail("Invalid initial termination time used. Fault was expected.");
+		} catch (UnacceptableInitialTerminationTimeFault e) {
+			// OK
+		}
+	}
+	
+	public void testWithNoITT() throws Exception {
+		Subscribe subscribe = getSubscription("subscribe-no-itt.xml");
+		subscription.validateSubscription(subscribe);
+	}
+	
+	public void testWithUseRaw() throws Exception {
+		Subscribe subscribe = getSubscription("subscribe-raw.xml");
+		subscription.validateSubscription(subscribe);
+	}
+	
+	public void testWithProducerProperties() throws Exception {
+		Subscribe subscribe = getSubscription("subscribe-pp.xml");
+		try {
+			subscription.validateSubscription(subscribe);
+			fail("ProducerProperties used. Fault was expected.");
+		} catch (InvalidProducerPropertiesExpressionFault e) {
+			// OK
+		}
+	}
+	
+	public void testWithNoTopic() throws Exception {
+		Subscribe subscribe = getSubscription("subscribe-no-topic.xml");
+		try {
+			subscription.validateSubscription(subscribe);
+			fail("ProducerProperties used. Fault was expected.");
+		} catch (InvalidFilterFault e) {
+			// OK
+		}
+	}
+	
+	public void testWithEPR() throws Exception {
+		Subscribe subscribe = getSubscription("subscribe-epr.xml");
+		subscription.validateSubscription(subscribe);
+	}
+	
+	public static class DummySubscription extends AbstractSubscription {
+
+		public DummySubscription(String name) {
+			super(name);
+		}
+
+		@Override
+		protected void start() throws SubscribeCreationFailedFault {
+		}
+
+		@Override
+		protected void pause() throws PauseFailedFault {
+		}
+
+		@Override
+		protected void resume() throws ResumeFailedFault {
+		}
+
+		@Override
+		protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault {
+		}
+		
+	}
+	
+}

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/SubscriptionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java Tue Feb 21 15:40:05 2006
@@ -1,392 +1,392 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn.component;
-
-import java.io.File;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.net.URI;
-import java.net.URL;
-import java.util.List;
-
-import javax.jbi.JBIException;
-import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.NormalizedMessage;
-import javax.xml.bind.JAXBContext;
-import javax.xml.namespace.QName;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.transform.Source;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.servicemix.MessageExchangeListener;
-import org.apache.servicemix.components.util.ComponentSupport;
-import org.apache.servicemix.jbi.container.ActivationSpec;
-import org.apache.servicemix.jbi.container.JBIContainer;
-import org.apache.servicemix.jbi.jaxp.SourceTransformer;
-import org.apache.servicemix.jbi.jaxp.StringSource;
-import org.apache.servicemix.tck.Receiver;
-import org.apache.servicemix.tck.ReceiverComponent;
-import org.apache.servicemix.wsn.client.AbstractWSAClient;
-import org.apache.servicemix.wsn.client.CreatePullPoint;
-import org.apache.servicemix.wsn.client.NotificationBroker;
-import org.apache.servicemix.wsn.client.Publisher;
-import org.apache.servicemix.wsn.client.PullPoint;
-import org.apache.servicemix.wsn.client.Subscription;
-import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
-import org.oasis_open.docs.wsn.b_2.Notify;
-import org.oasis_open.docs.wsn.b_2.Subscribe;
-import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
-import org.oasis_open.docs.wsn.b_2.Unsubscribe;
-import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse;
-import org.w3._2005._08.addressing.AttributedURIType;
-import org.w3._2005._08.addressing.EndpointReferenceType;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.xml.sax.InputSource;
-
-public class WSNComponentTest extends TestCase {
-	
-	public static QName NOTIFICATION_BROKER = new QName("http://servicemix.org/wsnotification", "NotificationBroker"); 
-	
-	private JBIContainer jbi;
-	private BrokerService jmsBroker;
-	private NotificationBroker wsnBroker;
-    private CreatePullPoint wsnCreatePullPoint;
-    private WSNComponent wsnComponent;
-	
-	protected void setUp() throws Exception {
-		jmsBroker = new BrokerService();
-		jmsBroker.setPersistent(false);
-		jmsBroker.addConnector("vm://localhost");
-		jmsBroker.start();
-
-		jbi = new JBIContainer();
-		jbi.setEmbedded(true);
-		jbi.init();
-		jbi.start();
-		
-		wsnComponent = new WSNComponent();
-        wsnComponent.setConnectionFactory(new ActiveMQConnectionFactory("vm://localhost"));
-		ActivationSpec as = new ActivationSpec();
-		as.setComponentName("servicemix-wsn2005");
-		as.setComponent(wsnComponent);
-		jbi.activateComponent(as);
-		
-		wsnBroker = new NotificationBroker(jbi);
-        wsnCreatePullPoint = new CreatePullPoint(jbi);
-	}
-	
-	protected void tearDown() throws Exception {
-		if (jbi != null) {
-			jbi.shutDown();
-		}
-		if (jmsBroker != null) {
-			jmsBroker.stop();
-		}
-	}
-	
-	public void testInvalidSubscribription() throws Exception {
-		try {
-			wsnBroker.subscribe(null, null, null);
-			fail("Expected an exception");
-		} catch (JBIException e) {
-			// ok
-		}
-	}
-	
-	public void testNotify() throws Exception {
-		ReceiverComponent receiver = new ReceiverComponent();
-		jbi.activateComponent(receiver, "receiver");
-		
-		EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT);
-		wsnBroker.subscribe(consumer, "myTopic", null);
-		
-		wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
-		// Wait for notification
-		Thread.sleep(50);
-		
-		receiver.getMessageList().assertMessagesReceived(1);
-		NormalizedMessage msg = (NormalizedMessage) receiver.getMessageList().getMessages().get(0);
-		Node node = new SourceTransformer().toDOMNode(msg);
-		assertEquals("Notify", node.getLocalName());
-		
-		// Wait for acks to be processed
-		Thread.sleep(50);
-	}
-
-	public void testRawNotify() throws Exception {
-		ReceiverComponent receiver = new ReceiverComponent();
-		jbi.activateComponent(receiver, "receiver");
-		
-		EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT);
-		wsnBroker.subscribe(consumer, "myTopic", null, true);
-		
-		wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
-		// Wait for notification
-		Thread.sleep(50);
-		
-		receiver.getMessageList().assertMessagesReceived(1);
-		NormalizedMessage msg = (NormalizedMessage) receiver.getMessageList().getMessages().get(0);
-		Node node = new SourceTransformer().toDOMNode(msg);
-		assertEquals("hello", node.getLocalName());
-		
-		// Wait for acks to be processed
-		Thread.sleep(50);
-	}
-
-	public void testUnsubscribe() throws Exception {
-		PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
-		Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
-		
-		wsnBroker.notify("myTopic", new Notify());
-		// Wait for notification
-		Thread.sleep(50);
-		
-		assertEquals(1, pullPoint.getMessages(0).size());
-
-		subscription.unsubscribe();
-		
-		wsnBroker.notify("myTopic", new Notify());
-		// Wait for notification
-		Thread.sleep(50);
-		
-		assertEquals(0, pullPoint.getMessages(0).size());
-
-		// Wait for acks to be processed
-		Thread.sleep(50);
-	}
-
-	public void testPauseResume() throws Exception {
-		PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
-		Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
-		
-		wsnBroker.notify("myTopic", new Notify());
-		// Wait for notification
-		Thread.sleep(50);
-		
-		assertEquals(1, pullPoint.getMessages(0).size());
-
-		subscription.pause();
-		
-		wsnBroker.notify("myTopic", new Notify());
-		// Wait for notification
-		Thread.sleep(50);
-		
-		assertEquals(0, pullPoint.getMessages(0).size());
-
-		subscription.resume();
-		
-		wsnBroker.notify("myTopic", new Notify());
-		// Wait for notification
-		Thread.sleep(50);
-		
-		assertEquals(1, pullPoint.getMessages(0).size());
-
-		// Wait for acks to be processed
-		Thread.sleep(50);
-	}
-
-	public void testPull() throws Exception {
-		PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
-		wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
-		
-		wsnBroker.notify("myTopic", new Notify());
-		// Wait for notification
-		Thread.sleep(50);
-		
-		List<NotificationMessageHolderType> msgs = pullPoint.getMessages(0);
-		assertNotNull(msgs);
-		assertEquals(1, msgs.size());
-
-		// Wait for acks to be processed
-		Thread.sleep(50);
-	}
-	
-	public void testPullWithFilter() throws Exception {
-		PullPoint pullPoint1 = wsnCreatePullPoint.createPullPoint();
-		PullPoint pullPoint2 = wsnCreatePullPoint.createPullPoint();
-		wsnBroker.subscribe(pullPoint1.getEndpoint(), "myTopic", "@type = 'a'");
-		wsnBroker.subscribe(pullPoint2.getEndpoint(), "myTopic", "@type = 'b'");
-		
-		wsnBroker.notify("myTopic", parse("<msg type='a'/>"));
-		// Wait for notification
-		Thread.sleep(500);
-
-		assertEquals(1, pullPoint1.getMessages(0).size());
-		assertEquals(0, pullPoint2.getMessages(0).size());
-		
-		wsnBroker.notify("myTopic", parse("<msg type='b'/>"));
-		// Wait for notification
-		Thread.sleep(500);
-
-		assertEquals(0, pullPoint1.getMessages(0).size());
-		assertEquals(1, pullPoint2.getMessages(0).size());
-
-		wsnBroker.notify("myTopic", parse("<msg type='c'/>"));
-		// Wait for notification
-		Thread.sleep(500);
-
-		assertEquals(0, pullPoint1.getMessages(0).size());
-		assertEquals(0, pullPoint2.getMessages(0).size());
-	}
-	
-	public void testDemandBasedPublisher() throws Exception {
-		PublisherComponent publisherComponent = new PublisherComponent();
-		jbi.activateComponent(publisherComponent, "publisher");
-		
-		Publisher publisher = wsnBroker.registerPublisher(
-									AbstractWSAClient.createWSA(PublisherComponent.SERVICE.getNamespaceURI() + "/" + PublisherComponent.SERVICE.getLocalPart() + "/" + PublisherComponent.ENDPOINT), 
-									"myTopic", true);
-		
-		Thread.sleep(50);
-		assertNull(publisherComponent.getSubscription());
-
-		PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
-		Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
-
-		Thread.sleep(500);
-		assertNotNull(publisherComponent.getSubscription());
-		
-		subscription.unsubscribe();
-		
-		Thread.sleep(500);
-		assertNull(publisherComponent.getSubscription());
-		
-		publisher.destroy();
-		
-		Thread.sleep(50);
-	}
-    
-    public void testDeployPullPoint() throws Exception {
-        URL url = getClass().getClassLoader().getResource("pullpoint/pullpoint.xml");
-        File path = new File(new URI(url.toString()));
-        path = path.getParentFile();
-        wsnComponent.getServiceUnitManager().deploy("pullpoint", path.getAbsolutePath());
-
-        wsnComponent.getServiceUnitManager().start("pullpoint");
-        
-        wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
-        PullPoint pullPoint = new PullPoint(AbstractWSAClient.createWSA("http://www.consumer.org/service/endpoint"), 
-                                            jbi);
-        assertEquals(1, pullPoint.getMessages(0).size());
-    }
-        
-    public void testDeploySubscription() throws Exception {
-        URL url = getClass().getClassLoader().getResource("subscription/subscribe.xml");
-        File path = new File(new URI(url.toString()));
-        path = path.getParentFile();
-        wsnComponent.getServiceUnitManager().deploy("subscription", path.getAbsolutePath());
-        
-        ActivationSpec consumer = new ActivationSpec();
-        consumer.setService(new QName("http://www.consumer.org", "service"));
-        consumer.setEndpoint("endpoint");
-        Receiver receiver = new ReceiverComponent();
-        consumer.setComponent(receiver);
-        jbi.activateComponent(consumer);
-        
-        wsnComponent.getServiceUnitManager().start("subscription");
-
-        wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
-        // Wait for notification
-        Thread.sleep(50);
-        receiver.getMessageList().assertMessagesReceived(1);
-        receiver.getMessageList().flushMessages();
-        
-        wsnComponent.getServiceUnitManager().stop("subscription");
-
-        wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
-        // Wait for notification
-        Thread.sleep(50);
-        assertEquals(0, receiver.getMessageList().flushMessages().size());
-        
-        wsnComponent.getServiceUnitManager().start("subscription");
-
-        wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
-        // Wait for notification
-        Thread.sleep(50);
-        receiver.getMessageList().assertMessagesReceived(1);
-        receiver.getMessageList().flushMessages();
-    }
-	
-	protected Element parse(String txt) throws Exception {
-		DocumentBuilder builder = new SourceTransformer().createDocumentBuilder();
-		InputSource is = new InputSource(new StringReader(txt));
-		Document doc = builder.parse(is);
-		return doc.getDocumentElement();
-	}
-	
-	protected EndpointReferenceType createEPR(QName service, String endpoint) {
-		EndpointReferenceType epr = new EndpointReferenceType();
-		epr.setAddress(new AttributedURIType());
-		epr.getAddress().setValue(service.getNamespaceURI() + "/" + service.getLocalPart() + "/" + endpoint);
-		return epr;
-	}
-	
-	public static class PublisherComponent extends ComponentSupport implements MessageExchangeListener {
-	    public static final QName SERVICE = new QName("http://servicemix.org/example", "publisher");
-	    public static final String ENDPOINT = "publisher";
-	    private Object subscription;
-	    public PublisherComponent() {
-	    	super(SERVICE, ENDPOINT);
-	    }
-		public Object getSubscription() {
-			return subscription;
-		}
-		public void onMessageExchange(MessageExchange exchange) throws MessagingException {
-			if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
-				try {
-					JAXBContext jaxbContext = JAXBContext.newInstance(Subscribe.class);
-					Source src = exchange.getMessage("in").getContent();
-					Object input = jaxbContext.createUnmarshaller().unmarshal(src);
-					if (input instanceof Subscribe) {
-						subscription = input;
-						SubscribeResponse response = new SubscribeResponse();
-						response.setSubscriptionReference(AbstractWSAClient.createWSA(PublisherComponent.SERVICE.getNamespaceURI() + "/" + PublisherComponent.SERVICE.getLocalPart() + "/" + PublisherComponent.ENDPOINT));
-						StringWriter writer = new StringWriter();
-						jaxbContext.createMarshaller().marshal(response, writer);
-						NormalizedMessage out = exchange.createMessage();
-						out.setContent(new StringSource(writer.toString()));
-						exchange.setMessage(out, "out");
-						send(exchange);
-					} else if (input instanceof Unsubscribe) {
-						subscription = null;
-						UnsubscribeResponse response = new UnsubscribeResponse();
-						StringWriter writer = new StringWriter();
-						jaxbContext.createMarshaller().marshal(response, writer);
-						NormalizedMessage out = exchange.createMessage();
-						out.setContent(new StringSource(writer.toString()));
-						exchange.setMessage(out, "out");
-						send(exchange);
-					} else {
-						throw new Exception("Unkown request");
-					}
-				} catch (Exception e) {
-					exchange.setError(e);
-					send(exchange);
-				}
-			} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
-				exchange.setStatus(ExchangeStatus.DONE);
-				send(exchange);
-			}
-		}
-	}
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn.component;
+
+import java.io.File;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.net.URI;
+import java.net.URL;
+import java.util.List;
+
+import javax.jbi.JBIException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.bind.JAXBContext;
+import javax.xml.namespace.QName;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.transform.Source;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.components.util.ComponentSupport;
+import org.apache.servicemix.jbi.container.ActivationSpec;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.tck.Receiver;
+import org.apache.servicemix.tck.ReceiverComponent;
+import org.apache.servicemix.wsn.client.AbstractWSAClient;
+import org.apache.servicemix.wsn.client.CreatePullPoint;
+import org.apache.servicemix.wsn.client.NotificationBroker;
+import org.apache.servicemix.wsn.client.Publisher;
+import org.apache.servicemix.wsn.client.PullPoint;
+import org.apache.servicemix.wsn.client.Subscription;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
+import org.oasis_open.docs.wsn.b_2.Unsubscribe;
+import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse;
+import org.w3._2005._08.addressing.AttributedURIType;
+import org.w3._2005._08.addressing.EndpointReferenceType;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.xml.sax.InputSource;
+
+public class WSNComponentTest extends TestCase {
+	
+	public static QName NOTIFICATION_BROKER = new QName("http://servicemix.org/wsnotification", "NotificationBroker"); 
+	
+	private JBIContainer jbi;
+	private BrokerService jmsBroker;
+	private NotificationBroker wsnBroker;
+    private CreatePullPoint wsnCreatePullPoint;
+    private WSNComponent wsnComponent;
+	
+	protected void setUp() throws Exception {
+		jmsBroker = new BrokerService();
+		jmsBroker.setPersistent(false);
+		jmsBroker.addConnector("vm://localhost");
+		jmsBroker.start();
+
+		jbi = new JBIContainer();
+		jbi.setEmbedded(true);
+		jbi.init();
+		jbi.start();
+		
+		wsnComponent = new WSNComponent();
+        wsnComponent.setConnectionFactory(new ActiveMQConnectionFactory("vm://localhost"));
+		ActivationSpec as = new ActivationSpec();
+		as.setComponentName("servicemix-wsn2005");
+		as.setComponent(wsnComponent);
+		jbi.activateComponent(as);
+		
+		wsnBroker = new NotificationBroker(jbi);
+        wsnCreatePullPoint = new CreatePullPoint(jbi);
+	}
+	
+	protected void tearDown() throws Exception {
+		if (jbi != null) {
+			jbi.shutDown();
+		}
+		if (jmsBroker != null) {
+			jmsBroker.stop();
+		}
+	}
+	
+	public void testInvalidSubscribription() throws Exception {
+		try {
+			wsnBroker.subscribe(null, null, null);
+			fail("Expected an exception");
+		} catch (JBIException e) {
+			// ok
+		}
+	}
+	
+	public void testNotify() throws Exception {
+		ReceiverComponent receiver = new ReceiverComponent();
+		jbi.activateComponent(receiver, "receiver");
+		
+		EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT);
+		wsnBroker.subscribe(consumer, "myTopic", null);
+		
+		wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
+		// Wait for notification
+		Thread.sleep(50);
+		
+		receiver.getMessageList().assertMessagesReceived(1);
+		NormalizedMessage msg = (NormalizedMessage) receiver.getMessageList().getMessages().get(0);
+		Node node = new SourceTransformer().toDOMNode(msg);
+		assertEquals("Notify", node.getLocalName());
+		
+		// Wait for acks to be processed
+		Thread.sleep(50);
+	}
+
+	public void testRawNotify() throws Exception {
+		ReceiverComponent receiver = new ReceiverComponent();
+		jbi.activateComponent(receiver, "receiver");
+		
+		EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT);
+		wsnBroker.subscribe(consumer, "myTopic", null, true);
+		
+		wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
+		// Wait for notification
+		Thread.sleep(50);
+		
+		receiver.getMessageList().assertMessagesReceived(1);
+		NormalizedMessage msg = (NormalizedMessage) receiver.getMessageList().getMessages().get(0);
+		Node node = new SourceTransformer().toDOMNode(msg);
+		assertEquals("hello", node.getLocalName());
+		
+		// Wait for acks to be processed
+		Thread.sleep(50);
+	}
+
+	public void testUnsubscribe() throws Exception {
+		PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
+		Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
+		
+		wsnBroker.notify("myTopic", new Notify());
+		// Wait for notification
+		Thread.sleep(50);
+		
+		assertEquals(1, pullPoint.getMessages(0).size());
+
+		subscription.unsubscribe();
+		
+		wsnBroker.notify("myTopic", new Notify());
+		// Wait for notification
+		Thread.sleep(50);
+		
+		assertEquals(0, pullPoint.getMessages(0).size());
+
+		// Wait for acks to be processed
+		Thread.sleep(50);
+	}
+
+	public void testPauseResume() throws Exception {
+		PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
+		Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
+		
+		wsnBroker.notify("myTopic", new Notify());
+		// Wait for notification
+		Thread.sleep(50);
+		
+		assertEquals(1, pullPoint.getMessages(0).size());
+
+		subscription.pause();
+		
+		wsnBroker.notify("myTopic", new Notify());
+		// Wait for notification
+		Thread.sleep(50);
+		
+		assertEquals(0, pullPoint.getMessages(0).size());
+
+		subscription.resume();
+		
+		wsnBroker.notify("myTopic", new Notify());
+		// Wait for notification
+		Thread.sleep(50);
+		
+		assertEquals(1, pullPoint.getMessages(0).size());
+
+		// Wait for acks to be processed
+		Thread.sleep(50);
+	}
+
+	public void testPull() throws Exception {
+		PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
+		wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
+		
+		wsnBroker.notify("myTopic", new Notify());
+		// Wait for notification
+		Thread.sleep(50);
+		
+		List<NotificationMessageHolderType> msgs = pullPoint.getMessages(0);
+		assertNotNull(msgs);
+		assertEquals(1, msgs.size());
+
+		// Wait for acks to be processed
+		Thread.sleep(50);
+	}
+	
+	public void testPullWithFilter() throws Exception {
+		PullPoint pullPoint1 = wsnCreatePullPoint.createPullPoint();
+		PullPoint pullPoint2 = wsnCreatePullPoint.createPullPoint();
+		wsnBroker.subscribe(pullPoint1.getEndpoint(), "myTopic", "@type = 'a'");
+		wsnBroker.subscribe(pullPoint2.getEndpoint(), "myTopic", "@type = 'b'");
+		
+		wsnBroker.notify("myTopic", parse("<msg type='a'/>"));
+		// Wait for notification
+		Thread.sleep(500);
+
+		assertEquals(1, pullPoint1.getMessages(0).size());
+		assertEquals(0, pullPoint2.getMessages(0).size());
+		
+		wsnBroker.notify("myTopic", parse("<msg type='b'/>"));
+		// Wait for notification
+		Thread.sleep(500);
+
+		assertEquals(0, pullPoint1.getMessages(0).size());
+		assertEquals(1, pullPoint2.getMessages(0).size());
+
+		wsnBroker.notify("myTopic", parse("<msg type='c'/>"));
+		// Wait for notification
+		Thread.sleep(500);
+
+		assertEquals(0, pullPoint1.getMessages(0).size());
+		assertEquals(0, pullPoint2.getMessages(0).size());
+	}
+	
+	public void testDemandBasedPublisher() throws Exception {
+		PublisherComponent publisherComponent = new PublisherComponent();
+		jbi.activateComponent(publisherComponent, "publisher");
+		
+		Publisher publisher = wsnBroker.registerPublisher(
+									AbstractWSAClient.createWSA(PublisherComponent.SERVICE.getNamespaceURI() + "/" + PublisherComponent.SERVICE.getLocalPart() + "/" + PublisherComponent.ENDPOINT), 
+									"myTopic", true);
+		
+		Thread.sleep(50);
+		assertNull(publisherComponent.getSubscription());
+
+		PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
+		Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
+
+		Thread.sleep(500);
+		assertNotNull(publisherComponent.getSubscription());
+		
+		subscription.unsubscribe();
+		
+		Thread.sleep(500);
+		assertNull(publisherComponent.getSubscription());
+		
+		publisher.destroy();
+		
+		Thread.sleep(50);
+	}
+    
+    public void testDeployPullPoint() throws Exception {
+        URL url = getClass().getClassLoader().getResource("pullpoint/pullpoint.xml");
+        File path = new File(new URI(url.toString()));
+        path = path.getParentFile();
+        wsnComponent.getServiceUnitManager().deploy("pullpoint", path.getAbsolutePath());
+
+        wsnComponent.getServiceUnitManager().start("pullpoint");
+        
+        wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
+        PullPoint pullPoint = new PullPoint(AbstractWSAClient.createWSA("http://www.consumer.org/service/endpoint"), 
+                                            jbi);
+        assertEquals(1, pullPoint.getMessages(0).size());
+    }
+        
+    public void testDeploySubscription() throws Exception {
+        URL url = getClass().getClassLoader().getResource("subscription/subscribe.xml");
+        File path = new File(new URI(url.toString()));
+        path = path.getParentFile();
+        wsnComponent.getServiceUnitManager().deploy("subscription", path.getAbsolutePath());
+        
+        ActivationSpec consumer = new ActivationSpec();
+        consumer.setService(new QName("http://www.consumer.org", "service"));
+        consumer.setEndpoint("endpoint");
+        Receiver receiver = new ReceiverComponent();
+        consumer.setComponent(receiver);
+        jbi.activateComponent(consumer);
+        
+        wsnComponent.getServiceUnitManager().start("subscription");
+
+        wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
+        // Wait for notification
+        Thread.sleep(50);
+        receiver.getMessageList().assertMessagesReceived(1);
+        receiver.getMessageList().flushMessages();
+        
+        wsnComponent.getServiceUnitManager().stop("subscription");
+
+        wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
+        // Wait for notification
+        Thread.sleep(50);
+        assertEquals(0, receiver.getMessageList().flushMessages().size());
+        
+        wsnComponent.getServiceUnitManager().start("subscription");
+
+        wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
+        // Wait for notification
+        Thread.sleep(50);
+        receiver.getMessageList().assertMessagesReceived(1);
+        receiver.getMessageList().flushMessages();
+    }
+	
+	protected Element parse(String txt) throws Exception {
+		DocumentBuilder builder = new SourceTransformer().createDocumentBuilder();
+		InputSource is = new InputSource(new StringReader(txt));
+		Document doc = builder.parse(is);
+		return doc.getDocumentElement();
+	}
+	
+	protected EndpointReferenceType createEPR(QName service, String endpoint) {
+		EndpointReferenceType epr = new EndpointReferenceType();
+		epr.setAddress(new AttributedURIType());
+		epr.getAddress().setValue(service.getNamespaceURI() + "/" + service.getLocalPart() + "/" + endpoint);
+		return epr;
+	}
+	
+	public static class PublisherComponent extends ComponentSupport implements MessageExchangeListener {
+	    public static final QName SERVICE = new QName("http://servicemix.org/example", "publisher");
+	    public static final String ENDPOINT = "publisher";
+	    private Object subscription;
+	    public PublisherComponent() {
+	    	super(SERVICE, ENDPOINT);
+	    }
+		public Object getSubscription() {
+			return subscription;
+		}
+		public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+			if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+				try {
+					JAXBContext jaxbContext = JAXBContext.newInstance(Subscribe.class);
+					Source src = exchange.getMessage("in").getContent();
+					Object input = jaxbContext.createUnmarshaller().unmarshal(src);
+					if (input instanceof Subscribe) {
+						subscription = input;
+						SubscribeResponse response = new SubscribeResponse();
+						response.setSubscriptionReference(AbstractWSAClient.createWSA(PublisherComponent.SERVICE.getNamespaceURI() + "/" + PublisherComponent.SERVICE.getLocalPart() + "/" + PublisherComponent.ENDPOINT));
+						StringWriter writer = new StringWriter();
+						jaxbContext.createMarshaller().marshal(response, writer);
+						NormalizedMessage out = exchange.createMessage();
+						out.setContent(new StringSource(writer.toString()));
+						exchange.setMessage(out, "out");
+						send(exchange);
+					} else if (input instanceof Unsubscribe) {
+						subscription = null;
+						UnsubscribeResponse response = new UnsubscribeResponse();
+						StringWriter writer = new StringWriter();
+						jaxbContext.createMarshaller().marshal(response, writer);
+						NormalizedMessage out = exchange.createMessage();
+						out.setContent(new StringSource(writer.toString()));
+						exchange.setMessage(out, "out");
+						send(exchange);
+					} else {
+						throw new Exception("Unkown request");
+					}
+				} catch (Exception e) {
+					exchange.setError(e);
+					send(exchange);
+				}
+			} else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+				exchange.setStatus(ExchangeStatus.DONE);
+				send(exchange);
+			}
+		}
+	}
+}

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j-tests.properties
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j-tests.properties?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j-tests.properties (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j-tests.properties Tue Feb 21 15:40:05 2006
@@ -1,21 +1,21 @@
-#
-# The logging properties used during tests..
-#
-log4j.rootLogger=DEBUG, out
-
-log4j.logger.org.apache.activemq=INFO
-log4j.logger.org.apache.activemq.spring=WARN
-log4j.logger.org.apache.activemq.store.journal=INFO
-log4j.logger.org.activeio.journal=INFO
-
-# CONSOLE appender not used by default
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-
-# File appender
-log4j.appender.out=org.apache.log4j.FileAppender
-log4j.appender.out.layout=org.apache.log4j.PatternLayout
-log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-log4j.appender.out.file=target/servicemix-test.log
-log4j.appender.out.append=true
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=DEBUG, out
+
+log4j.logger.org.apache.activemq=INFO
+log4j.logger.org.apache.activemq.spring=WARN
+log4j.logger.org.apache.activemq.store.journal=INFO
+log4j.logger.org.activeio.journal=INFO
+
+# CONSOLE appender not used by default
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.out=org.apache.log4j.FileAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.out.file=target/servicemix-test.log
+log4j.appender.out.append=true

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j-tests.properties
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j.properties?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j.properties (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j.properties Tue Feb 21 15:40:05 2006
@@ -1,21 +1,21 @@
-#
-# The logging properties used during tests..
-#
-log4j.rootLogger=DEBUG, stdout
-
-log4j.logger.org.apache.activemq=INFO
-log4j.logger.org.apache.activemq.spring=WARN
-log4j.logger.org.apache.activemq.store.journal=INFO
-log4j.logger.org.activeio.journal=INFO
-
-# CONSOLE appender not used by default
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-
-# File appender
-log4j.appender.out=org.apache.log4j.FileAppender
-log4j.appender.out.layout=org.apache.log4j.PatternLayout
-log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-log4j.appender.out.file=target/servicemix-test.log
-log4j.appender.out.append=true
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=DEBUG, stdout
+
+log4j.logger.org.apache.activemq=INFO
+log4j.logger.org.apache.activemq.spring=WARN
+log4j.logger.org.apache.activemq.store.journal=INFO
+log4j.logger.org.activeio.journal=INFO
+
+# CONSOLE appender not used by default
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.out=org.apache.log4j.FileAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.out.file=target/servicemix-test.log
+log4j.appender.out.append=true

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j.properties
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/org/apache/servicemix/wsn/subscribe-abs-itt.xml
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/org/apache/servicemix/wsn/subscribe-abs-itt.xml?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/org/apache/servicemix/wsn/subscribe-abs-itt.xml (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/org/apache/servicemix/wsn/subscribe-abs-itt.xml Tue Feb 21 15:40:05 2006
@@ -1,23 +1,23 @@
-<?xml version="1.0"?>
-<wsnt:Subscribe
-  xmlns:wsnt="http://docs.oasis-open.org/wsn/b-2"
-  xmlns:wsa="http://www.w3.org/2005/08/addressing"
-  xmlns:ncex="http://www.consumer.org"
-  xmlns:npex="http://www.producer.org">
-  <wsnt:ConsumerReference>
-    <wsa:Address>
-      http://www.consumer.org/ConsumerEndpoint
-    </wsa:Address>
-  </wsnt:ConsumerReference>
-  <wsnt:Filter>
-    <wsnt:TopicExpression Dialect="http://docs.oasis-open.org/wsn/t-1/TopicExpression/Simple">
-      npex:SomeTopic
-    </wsnt:TopicExpression>
-    <wsnt:MessageContent Dialect="http://www.w3.org/TR/1999/REC-xpath-19991116">
-      boolean(ncex:Producer="15")
-    </wsnt:MessageContent>
-  </wsnt:Filter>
-  <wsnt:InitialTerminationTime>
-    2007-12-25T00:00:00.00000Z
-  </wsnt:InitialTerminationTime>
+<?xml version="1.0"?>
+<wsnt:Subscribe
+  xmlns:wsnt="http://docs.oasis-open.org/wsn/b-2"
+  xmlns:wsa="http://www.w3.org/2005/08/addressing"
+  xmlns:ncex="http://www.consumer.org"
+  xmlns:npex="http://www.producer.org">
+  <wsnt:ConsumerReference>
+    <wsa:Address>
+      http://www.consumer.org/ConsumerEndpoint
+    </wsa:Address>
+  </wsnt:ConsumerReference>
+  <wsnt:Filter>
+    <wsnt:TopicExpression Dialect="http://docs.oasis-open.org/wsn/t-1/TopicExpression/Simple">
+      npex:SomeTopic
+    </wsnt:TopicExpression>
+    <wsnt:MessageContent Dialect="http://www.w3.org/TR/1999/REC-xpath-19991116">
+      boolean(ncex:Producer="15")
+    </wsnt:MessageContent>
+  </wsnt:Filter>
+  <wsnt:InitialTerminationTime>
+    2007-12-25T00:00:00.00000Z
+  </wsnt:InitialTerminationTime>
 </wsnt:Subscribe>

Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/org/apache/servicemix/wsn/subscribe-abs-itt.xml
------------------------------------------------------------------------------
    svn:eol-style = native