You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ch...@apache.org on 2006/02/22 00:40:29 UTC
svn commit: r379627 [25/34] - in /incubator/servicemix/trunk: ./ etc/
sandbox/servicemix-wsn-1.2/src/sa/META-INF/
sandbox/servicemix-wsn-1.2/src/su/META-INF/ servicemix-assembly/
servicemix-assembly/src/main/assembly/ servicemix-assembly/src/main/relea...
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsPullPoint.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsPullPoint.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsPullPoint.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsPullPoint.java Tue Feb 21 15:40:05 2006
@@ -1,145 +1,145 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn.jms;
-
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.wsn.AbstractPullPoint;
-import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
-import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
-import org.oasis_open.docs.wsn.b_2.Notify;
-import org.oasis_open.docs.wsrf.r_2.ResourceUnknownFaultType;
-
-public class JmsPullPoint extends AbstractPullPoint {
-
- private static Log log = LogFactory.getLog(JmsPullPoint.class);
-
- private JAXBContext jaxbContext;
- private Connection connection;
- private Session session;
- private Queue queue;
- private MessageProducer producer;
- private MessageConsumer consumer;
-
- public JmsPullPoint(String name) {
- super(name);
- try {
- jaxbContext = JAXBContext.newInstance(Notify.class);
- } catch (JAXBException e) {
- throw new RuntimeException("Could not create PullEndpoint", e);
- }
- }
-
- protected void initSession() throws JMSException {
- if (session == null) {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- queue = session.createQueue(getName());
- producer = session.createProducer(queue);
- consumer = session.createConsumer(queue);
- }
- }
-
- @Override
- protected synchronized void store(NotificationMessageHolderType messageHolder) {
- try {
- initSession();
- Notify notify = new Notify();
- notify.getNotificationMessage().add(messageHolder);
- StringWriter writer = new StringWriter();
- jaxbContext.createMarshaller().marshal(notify, writer);
- Message message = session.createTextMessage(writer.toString());
- producer.send(message);
- } catch (JMSException e) {
- log.warn("Error storing message", e);
- if (session != null) {
- try {
- session.close();
- } catch (JMSException inner) {
- log.debug("Error closing session", inner);
- } finally {
- session = null;
- }
- }
- } catch (JAXBException e) {
- log.warn("Error storing message", e);
- }
- }
-
- @Override
- protected synchronized List<NotificationMessageHolderType> getMessages(int max) throws ResourceUnknownFault {
- Session session = null;
- try {
- if (max == 0) {
- max = 256;
- }
- initSession();
- List<NotificationMessageHolderType> messages = new ArrayList<NotificationMessageHolderType>();
- for (int i = 0; i < max; i++) {
- Message msg = consumer.receiveNoWait();
- if (msg == null) {
- break;
- }
- TextMessage txtMsg = (TextMessage) msg;
- StringReader reader = new StringReader(txtMsg.getText());
- Notify notify = (Notify) jaxbContext.createUnmarshaller().unmarshal(reader);
- messages.addAll(notify.getNotificationMessage());
- }
- return messages;
- } catch (JMSException e) {
- log.info("Error retrieving messages", e);
- if (session != null) {
- try {
- session.close();
- } catch (JMSException inner) {
- log.debug("Error closing session", inner);
- } finally {
- session = null;
- }
- }
- ResourceUnknownFaultType fault = new ResourceUnknownFaultType();
- throw new ResourceUnknownFault("Unable to retrieve messages", fault, e);
- } catch (JAXBException e) {
- log.info("Error retrieving messages", e);
- ResourceUnknownFaultType fault = new ResourceUnknownFaultType();
- throw new ResourceUnknownFault("Unable to retrieve messages", fault, e);
- }
- }
-
- public Connection getConnection() {
- return connection;
- }
-
- public void setConnection(Connection connection) {
- this.connection = connection;
- }
-
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn.jms;
+
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.wsn.AbstractPullPoint;
+import org.apache.servicemix.wsn.jaxws.ResourceUnknownFault;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsrf.r_2.ResourceUnknownFaultType;
+
+public class JmsPullPoint extends AbstractPullPoint {
+
+ private static Log log = LogFactory.getLog(JmsPullPoint.class);
+
+ private JAXBContext jaxbContext;
+ private Connection connection;
+ private Session session;
+ private Queue queue;
+ private MessageProducer producer;
+ private MessageConsumer consumer;
+
+ public JmsPullPoint(String name) {
+ super(name);
+ try {
+ jaxbContext = JAXBContext.newInstance(Notify.class);
+ } catch (JAXBException e) {
+ throw new RuntimeException("Could not create PullEndpoint", e);
+ }
+ }
+
+ protected void initSession() throws JMSException {
+ if (session == null) {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = session.createQueue(getName());
+ producer = session.createProducer(queue);
+ consumer = session.createConsumer(queue);
+ }
+ }
+
+ @Override
+ protected synchronized void store(NotificationMessageHolderType messageHolder) {
+ try {
+ initSession();
+ Notify notify = new Notify();
+ notify.getNotificationMessage().add(messageHolder);
+ StringWriter writer = new StringWriter();
+ jaxbContext.createMarshaller().marshal(notify, writer);
+ Message message = session.createTextMessage(writer.toString());
+ producer.send(message);
+ } catch (JMSException e) {
+ log.warn("Error storing message", e);
+ if (session != null) {
+ try {
+ session.close();
+ } catch (JMSException inner) {
+ log.debug("Error closing session", inner);
+ } finally {
+ session = null;
+ }
+ }
+ } catch (JAXBException e) {
+ log.warn("Error storing message", e);
+ }
+ }
+
+ @Override
+ protected synchronized List<NotificationMessageHolderType> getMessages(int max) throws ResourceUnknownFault {
+ Session session = null;
+ try {
+ if (max == 0) {
+ max = 256;
+ }
+ initSession();
+ List<NotificationMessageHolderType> messages = new ArrayList<NotificationMessageHolderType>();
+ for (int i = 0; i < max; i++) {
+ Message msg = consumer.receiveNoWait();
+ if (msg == null) {
+ break;
+ }
+ TextMessage txtMsg = (TextMessage) msg;
+ StringReader reader = new StringReader(txtMsg.getText());
+ Notify notify = (Notify) jaxbContext.createUnmarshaller().unmarshal(reader);
+ messages.addAll(notify.getNotificationMessage());
+ }
+ return messages;
+ } catch (JMSException e) {
+ log.info("Error retrieving messages", e);
+ if (session != null) {
+ try {
+ session.close();
+ } catch (JMSException inner) {
+ log.debug("Error closing session", inner);
+ } finally {
+ session = null;
+ }
+ }
+ ResourceUnknownFaultType fault = new ResourceUnknownFaultType();
+ throw new ResourceUnknownFault("Unable to retrieve messages", fault, e);
+ } catch (JAXBException e) {
+ log.info("Error retrieving messages", e);
+ ResourceUnknownFaultType fault = new ResourceUnknownFaultType();
+ throw new ResourceUnknownFault("Unable to retrieve messages", fault, e);
+ }
+ }
+
+ public Connection getConnection() {
+ return connection;
+ }
+
+ public void setConnection(Connection connection) {
+ this.connection = connection;
+ }
+
+}
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsPullPoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsSubscription.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsSubscription.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsSubscription.java Tue Feb 21 15:40:05 2006
@@ -1,213 +1,213 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn.jms;
-
-import java.io.StringReader;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.xml.datatype.XMLGregorianCalendar;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpression;
-import javax.xml.xpath.XPathExpressionException;
-import javax.xml.xpath.XPathFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.wsn.AbstractSubscription;
-import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
-import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
-import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
-import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
-import org.apache.servicemix.wsn.jaxws.PauseFailedFault;
-import org.apache.servicemix.wsn.jaxws.ResumeFailedFault;
-import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
-import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
-import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
-import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault;
-import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
-import org.apache.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault;
-import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
-import org.oasis_open.docs.wsn.b_2.PauseFailedFaultType;
-import org.oasis_open.docs.wsn.b_2.ResumeFailedFaultType;
-import org.oasis_open.docs.wsn.b_2.Subscribe;
-import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
-import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType;
-import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.xml.sax.InputSource;
-
-public abstract class JmsSubscription extends AbstractSubscription implements MessageListener {
-
- private static Log log = LogFactory.getLog(JmsSubscription.class);
-
- private Connection connection;
- private Session session;
- private JmsTopicExpressionConverter topicConverter;
- private Topic jmsTopic;
-
- public JmsSubscription(String name) {
- super(name);
- topicConverter = new JmsTopicExpressionConverter();
- }
-
- protected void start() throws SubscribeCreationFailedFault {
- try {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(jmsTopic);
- consumer.setMessageListener(this);
- } catch (JMSException e) {
- SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
- throw new SubscribeCreationFailedFault("Error starting subscription", fault, e);
- }
- }
-
- @Override
- protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
- super.validateSubscription(subscribeRequest);
- try {
- jmsTopic = topicConverter.toActiveMQTopic(topic);
- } catch (InvalidTopicException e) {
- InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
- throw new InvalidTopicExpressionFault(e.getMessage(), fault);
- }
- }
-
- @Override
- protected void pause() throws PauseFailedFault {
- if (session == null) {
- PauseFailedFaultType fault = new PauseFailedFaultType();
- throw new PauseFailedFault("Subscription is already paused", fault);
- } else {
- try {
- session.close();
- } catch (JMSException e) {
- PauseFailedFaultType fault = new PauseFailedFaultType();
- throw new PauseFailedFault("Error pausing subscription", fault, e);
- } finally {
- session = null;
- }
- }
- }
-
- @Override
- protected void resume() throws ResumeFailedFault {
- if (session != null) {
- ResumeFailedFaultType fault = new ResumeFailedFaultType();
- throw new ResumeFailedFault("Subscription is already running", fault);
- } else {
- try {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(jmsTopic);
- consumer.setMessageListener(this);
- } catch (JMSException e) {
- ResumeFailedFaultType fault = new ResumeFailedFaultType();
- throw new ResumeFailedFault("Error resuming subscription", fault, e);
- }
- }
- }
-
- @Override
- protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault {
- UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
- throw new UnacceptableTerminationTimeFault(
- "TerminationTime is not supported",
- fault);
- }
-
- @Override
- protected void unsubscribe() throws UnableToDestroySubscriptionFault {
- super.unsubscribe();
- if (session != null) {
- try {
- session.close();
- } catch (JMSException e) {
- UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
- throw new UnableToDestroySubscriptionFault("Unable to unsubscribe", fault, e);
- } finally {
- session = null;
- }
- }
- }
-
- public Connection getConnection() {
- return connection;
- }
-
- public void setConnection(Connection connection) {
- this.connection = connection;
- }
-
- public void onMessage(Message jmsMessage) {
- try {
- TextMessage text = (TextMessage) jmsMessage;
- DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
- factory.setNamespaceAware(true);
- Document doc = factory.newDocumentBuilder().parse(new InputSource(new StringReader(text.getText())));
- Element root = doc.getDocumentElement();
- Element holder = (Element) root.getElementsByTagNameNS(WSN_URI, "NotificationMessage").item(0);
- Element message = (Element) holder.getElementsByTagNameNS(WSN_URI, "Message").item(0);
- Element content = null;
- for (int i = 0; i < message.getChildNodes().getLength(); i++) {
- if (message.getChildNodes().item(i) instanceof Element) {
- content = (Element) message.getChildNodes().item(i);
- break;
- }
- }
- boolean match = doFilter(content);
- if (match) {
- if (useRaw) {
- doNotify(content);
- } else {
- doNotify(root);
- }
- }
- } catch (Exception e) {
- log.warn("Error notifying consumer", e);
- }
- }
-
- protected boolean doFilter(Element content) {
- if (contentFilter != null) {
- if (!contentFilter.getDialect().equals(XPATH1_URI)) {
- throw new IllegalStateException("Unsupported dialect: " + contentFilter.getDialect());
- }
- try {
- XPathFactory xpfactory = XPathFactory.newInstance();
- XPath xpath = xpfactory.newXPath();
- XPathExpression exp = xpath.compile(contentFilter.getContent().get(0).toString());
- Boolean ret = (Boolean) exp.evaluate(content, XPathConstants.BOOLEAN);
- return ret.booleanValue();
- } catch (XPathExpressionException e) {
- log.warn("Could not filter notification", e);
- }
- return false;
- }
- return true;
- }
-
- protected abstract void doNotify(Element content);
-
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn.jms;
+
+import java.io.StringReader;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.xml.datatype.XMLGregorianCalendar;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.wsn.AbstractSubscription;
+import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
+import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
+import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
+import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
+import org.apache.servicemix.wsn.jaxws.PauseFailedFault;
+import org.apache.servicemix.wsn.jaxws.ResumeFailedFault;
+import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
+import org.apache.servicemix.wsn.jaxws.TopicExpressionDialectUnknownFault;
+import org.apache.servicemix.wsn.jaxws.TopicNotSupportedFault;
+import org.apache.servicemix.wsn.jaxws.UnableToDestroySubscriptionFault;
+import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
+import org.apache.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault;
+import org.oasis_open.docs.wsn.b_2.InvalidTopicExpressionFaultType;
+import org.oasis_open.docs.wsn.b_2.PauseFailedFaultType;
+import org.oasis_open.docs.wsn.b_2.ResumeFailedFaultType;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
+import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType;
+import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.xml.sax.InputSource;
+
+public abstract class JmsSubscription extends AbstractSubscription implements MessageListener {
+
+ private static Log log = LogFactory.getLog(JmsSubscription.class);
+
+ private Connection connection;
+ private Session session;
+ private JmsTopicExpressionConverter topicConverter;
+ private Topic jmsTopic;
+
+ public JmsSubscription(String name) {
+ super(name);
+ topicConverter = new JmsTopicExpressionConverter();
+ }
+
+ protected void start() throws SubscribeCreationFailedFault {
+ try {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(jmsTopic);
+ consumer.setMessageListener(this);
+ } catch (JMSException e) {
+ SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
+ throw new SubscribeCreationFailedFault("Error starting subscription", fault, e);
+ }
+ }
+
+ @Override
+ protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
+ super.validateSubscription(subscribeRequest);
+ try {
+ jmsTopic = topicConverter.toActiveMQTopic(topic);
+ } catch (InvalidTopicException e) {
+ InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+ throw new InvalidTopicExpressionFault(e.getMessage(), fault);
+ }
+ }
+
+ @Override
+ protected void pause() throws PauseFailedFault {
+ if (session == null) {
+ PauseFailedFaultType fault = new PauseFailedFaultType();
+ throw new PauseFailedFault("Subscription is already paused", fault);
+ } else {
+ try {
+ session.close();
+ } catch (JMSException e) {
+ PauseFailedFaultType fault = new PauseFailedFaultType();
+ throw new PauseFailedFault("Error pausing subscription", fault, e);
+ } finally {
+ session = null;
+ }
+ }
+ }
+
+ @Override
+ protected void resume() throws ResumeFailedFault {
+ if (session != null) {
+ ResumeFailedFaultType fault = new ResumeFailedFaultType();
+ throw new ResumeFailedFault("Subscription is already running", fault);
+ } else {
+ try {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(jmsTopic);
+ consumer.setMessageListener(this);
+ } catch (JMSException e) {
+ ResumeFailedFaultType fault = new ResumeFailedFaultType();
+ throw new ResumeFailedFault("Error resuming subscription", fault, e);
+ }
+ }
+ }
+
+ @Override
+ protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault {
+ UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
+ throw new UnacceptableTerminationTimeFault(
+ "TerminationTime is not supported",
+ fault);
+ }
+
+ @Override
+ protected void unsubscribe() throws UnableToDestroySubscriptionFault {
+ super.unsubscribe();
+ if (session != null) {
+ try {
+ session.close();
+ } catch (JMSException e) {
+ UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
+ throw new UnableToDestroySubscriptionFault("Unable to unsubscribe", fault, e);
+ } finally {
+ session = null;
+ }
+ }
+ }
+
+ public Connection getConnection() {
+ return connection;
+ }
+
+ public void setConnection(Connection connection) {
+ this.connection = connection;
+ }
+
+ public void onMessage(Message jmsMessage) {
+ try {
+ TextMessage text = (TextMessage) jmsMessage;
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ factory.setNamespaceAware(true);
+ Document doc = factory.newDocumentBuilder().parse(new InputSource(new StringReader(text.getText())));
+ Element root = doc.getDocumentElement();
+ Element holder = (Element) root.getElementsByTagNameNS(WSN_URI, "NotificationMessage").item(0);
+ Element message = (Element) holder.getElementsByTagNameNS(WSN_URI, "Message").item(0);
+ Element content = null;
+ for (int i = 0; i < message.getChildNodes().getLength(); i++) {
+ if (message.getChildNodes().item(i) instanceof Element) {
+ content = (Element) message.getChildNodes().item(i);
+ break;
+ }
+ }
+ boolean match = doFilter(content);
+ if (match) {
+ if (useRaw) {
+ doNotify(content);
+ } else {
+ doNotify(root);
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Error notifying consumer", e);
+ }
+ }
+
+ protected boolean doFilter(Element content) {
+ if (contentFilter != null) {
+ if (!contentFilter.getDialect().equals(XPATH1_URI)) {
+ throw new IllegalStateException("Unsupported dialect: " + contentFilter.getDialect());
+ }
+ try {
+ XPathFactory xpfactory = XPathFactory.newInstance();
+ XPath xpath = xpfactory.newXPath();
+ XPathExpression exp = xpath.compile(contentFilter.getContent().get(0).toString());
+ Boolean ret = (Boolean) exp.evaluate(content, XPathConstants.BOOLEAN);
+ return ret.booleanValue();
+ } catch (XPathExpressionException e) {
+ log.warn("Could not filter notification", e);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ protected abstract void doNotify(Element content);
+
+}
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsSubscription.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsTopicExpressionConverter.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsTopicExpressionConverter.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsTopicExpressionConverter.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsTopicExpressionConverter.java Tue Feb 21 15:40:05 2006
@@ -1,93 +1,93 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn.jms;
-
-import java.util.Iterator;
-import java.util.List;
-
-import javax.jms.Topic;
-import javax.xml.namespace.QName;
-
-import org.apache.activemq.command.ActiveMQTopic;
-import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
-
-public class JmsTopicExpressionConverter {
-
- public static final String SIMPLE_DIALECT = "http://docs.oasis-open.org/wsn/t-1/TopicExpression/Simple";
-
- public TopicExpressionType toTopicExpression(Topic topic) {
- return toTopicExpression(topic.toString());
- }
-
- public TopicExpressionType toTopicExpression(ActiveMQTopic topic) {
- return toTopicExpression(topic.getPhysicalName());
- }
-
- public TopicExpressionType toTopicExpression(String name) {
- TopicExpressionType answer = new TopicExpressionType();
- answer.getContent().add(QName.valueOf(name));
- answer.setDialect(SIMPLE_DIALECT);
- return answer;
- }
-
- public ActiveMQTopic toActiveMQTopic(List<TopicExpressionType> topics) throws InvalidTopicException {
- if (topics == null || topics.size() == 0) {
- return null;
- }
- int size = topics.size();
- ActiveMQTopic childrenDestinations[] = new ActiveMQTopic[size];
- for (int i = 0; i < size; i++) {
- childrenDestinations[i] = toActiveMQTopic(topics.get(i));
- }
-
- ActiveMQTopic topic = new ActiveMQTopic();
- topic.setCompositeDestinations(childrenDestinations);
- return topic;
- }
-
- public ActiveMQTopic toActiveMQTopic(TopicExpressionType topic) throws InvalidTopicException {
- String dialect = topic.getDialect();
- if (dialect == null || SIMPLE_DIALECT.equals(dialect)) {
- for (Iterator iter = topic.getContent().iterator(); iter.hasNext();) {
- ActiveMQTopic answer = createActiveMQTopicFromContent(iter.next());
- if (answer != null) {
- return answer;
- }
- }
- throw new InvalidTopicException("No topic name available topic: " + topic);
- }
- else {
- throw new InvalidTopicException("Topic dialect: " + dialect + " not supported");
- }
- }
-
- // Implementation methods
- // -------------------------------------------------------------------------
- protected ActiveMQTopic createActiveMQTopicFromContent(Object contentItem) {
- if (contentItem instanceof String) {
- return new ActiveMQTopic(((String) contentItem).trim());
- }
- if (contentItem instanceof QName) {
- return createActiveMQTopicFromQName((QName) contentItem);
- }
- return null;
- }
-
- protected ActiveMQTopic createActiveMQTopicFromQName(QName qName) {
- return new ActiveMQTopic(qName.toString());
- }
-
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn.jms;
+
+import java.util.Iterator;
+import java.util.List;
+
+import javax.jms.Topic;
+import javax.xml.namespace.QName;
+
+import org.apache.activemq.command.ActiveMQTopic;
+import org.oasis_open.docs.wsn.b_2.TopicExpressionType;
+
+public class JmsTopicExpressionConverter {
+
+ public static final String SIMPLE_DIALECT = "http://docs.oasis-open.org/wsn/t-1/TopicExpression/Simple";
+
+ public TopicExpressionType toTopicExpression(Topic topic) {
+ return toTopicExpression(topic.toString());
+ }
+
+ public TopicExpressionType toTopicExpression(ActiveMQTopic topic) {
+ return toTopicExpression(topic.getPhysicalName());
+ }
+
+ public TopicExpressionType toTopicExpression(String name) {
+ TopicExpressionType answer = new TopicExpressionType();
+ answer.getContent().add(QName.valueOf(name));
+ answer.setDialect(SIMPLE_DIALECT);
+ return answer;
+ }
+
+ public ActiveMQTopic toActiveMQTopic(List<TopicExpressionType> topics) throws InvalidTopicException {
+ if (topics == null || topics.size() == 0) {
+ return null;
+ }
+ int size = topics.size();
+ ActiveMQTopic childrenDestinations[] = new ActiveMQTopic[size];
+ for (int i = 0; i < size; i++) {
+ childrenDestinations[i] = toActiveMQTopic(topics.get(i));
+ }
+
+ ActiveMQTopic topic = new ActiveMQTopic();
+ topic.setCompositeDestinations(childrenDestinations);
+ return topic;
+ }
+
+ public ActiveMQTopic toActiveMQTopic(TopicExpressionType topic) throws InvalidTopicException {
+ String dialect = topic.getDialect();
+ if (dialect == null || SIMPLE_DIALECT.equals(dialect)) {
+ for (Iterator iter = topic.getContent().iterator(); iter.hasNext();) {
+ ActiveMQTopic answer = createActiveMQTopicFromContent(iter.next());
+ if (answer != null) {
+ return answer;
+ }
+ }
+ throw new InvalidTopicException("No topic name available topic: " + topic);
+ }
+ else {
+ throw new InvalidTopicException("Topic dialect: " + dialect + " not supported");
+ }
+ }
+
+ // Implementation methods
+ // -------------------------------------------------------------------------
+ protected ActiveMQTopic createActiveMQTopicFromContent(Object contentItem) {
+ if (contentItem instanceof String) {
+ return new ActiveMQTopic(((String) contentItem).trim());
+ }
+ if (contentItem instanceof QName) {
+ return createActiveMQTopicFromQName((QName) contentItem);
+ }
+ return null;
+ }
+
+ protected ActiveMQTopic createActiveMQTopicFromQName(QName qName) {
+ return new ActiveMQTopic(qName.toString());
+ }
+
+}
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsTopicExpressionConverter.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/package.html
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/package.html?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/package.html (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/package.html Tue Feb 21 15:40:05 2006
@@ -1,9 +1,9 @@
-<html>
-<head>
-</head>
-<body>
-
-JMS based implementation of the NotificationBroker and related services.
-
-</body>
-</html>
+<html>
+<head>
+</head>
+<body>
+
+JMS based implementation of the NotificationBroker and related services.
+
+</body>
+</html>
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/package.html
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/package.html?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/package.html (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/package.html Tue Feb 21 15:40:05 2006
@@ -1,9 +1,9 @@
-<html>
-<head>
-</head>
-<body>
-
-This package provides base classes for WS-Notification implementations.
-
-</body>
-</html>
+<html>
+<head>
+</head>
+<body>
+
+This package provides base classes for WS-Notification implementations.
+
+</body>
+</html>
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/resources/META-INF/DISCLAIMER.txt
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/resources/META-INF/DISCLAIMER.txt?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/resources/META-INF/DISCLAIMER.txt (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/resources/META-INF/DISCLAIMER.txt Tue Feb 21 15:40:05 2006
@@ -1,7 +1,7 @@
-ActiveMQ is an effort undergoing incubation at the Apache Software Foundation
-(ASF), sponsored by the Geronimo PMC. Incubation is required of all newly
-accepted projects until a further review indicates that the infrastructure,
-communications, and decision making process have stabilized in a manner
-consistent with other successful ASF projects. While incubation status is not
-necessarily a reflection of the completeness or stability of the code, it does
+ActiveMQ is an effort undergoing incubation at the Apache Software Foundation
+(ASF), sponsored by the Geronimo PMC. Incubation is required of all newly
+accepted projects until a further review indicates that the infrastructure,
+communications, and decision making process have stabilized in a manner
+consistent with other successful ASF projects. While incubation status is not
+necessarily a reflection of the completeness or stability of the code, it does
indicate that the project has yet to be fully endorsed by the ASF.
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/resources/META-INF/DISCLAIMER.txt
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/resources/META-INF/LICENSE.txt
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/main/wsdl/catalog.xml
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/SubscriptionTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/SubscriptionTest.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/SubscriptionTest.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/SubscriptionTest.java Tue Feb 21 15:40:05 2006
@@ -1,140 +1,140 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.datatype.XMLGregorianCalendar;
-
-import junit.framework.TestCase;
-
-import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
-import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
-import org.apache.servicemix.wsn.jaxws.PauseFailedFault;
-import org.apache.servicemix.wsn.jaxws.ResumeFailedFault;
-import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
-import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
-import org.apache.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault;
-import org.oasis_open.docs.wsn.b_2.Subscribe;
-
-public class SubscriptionTest extends TestCase {
-
- private JAXBContext context;
- private Unmarshaller unmarshaller;
- private AbstractSubscription subscription;
-
- protected void setUp() throws Exception {
- context = JAXBContext.newInstance(Subscribe.class);
- unmarshaller = context.createUnmarshaller();
- subscription = new DummySubscription("mySubscription");
- }
-
- protected Subscribe getSubscription(String file) throws JAXBException, IOException {
- InputStream is = getClass().getResourceAsStream(file);
- Subscribe subscribe = (Subscribe) unmarshaller.unmarshal(is);
- is.close();
- return subscribe;
- }
-
- public void testWithNilITT() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-nil-itt.xml");
- subscription.validateSubscription(subscribe);
- }
-
- public void testWithAbsoluteITT() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-abs-itt.xml");
- try {
- subscription.validateSubscription(subscribe);
- fail("Invalid initial termination time used. Fault was expected.");
- } catch (UnacceptableInitialTerminationTimeFault e) {
- // OK
- }
- }
-
- public void testWithEmptyITT() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-empty-itt.xml");
- try {
- subscription.validateSubscription(subscribe);
- fail("Invalid initial termination time used. Fault was expected.");
- } catch (UnacceptableInitialTerminationTimeFault e) {
- // OK
- }
- }
-
- public void testWithNoITT() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-no-itt.xml");
- subscription.validateSubscription(subscribe);
- }
-
- public void testWithUseRaw() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-raw.xml");
- subscription.validateSubscription(subscribe);
- }
-
- public void testWithProducerProperties() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-pp.xml");
- try {
- subscription.validateSubscription(subscribe);
- fail("ProducerProperties used. Fault was expected.");
- } catch (InvalidProducerPropertiesExpressionFault e) {
- // OK
- }
- }
-
- public void testWithNoTopic() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-no-topic.xml");
- try {
- subscription.validateSubscription(subscribe);
- fail("ProducerProperties used. Fault was expected.");
- } catch (InvalidFilterFault e) {
- // OK
- }
- }
-
- public void testWithEPR() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-epr.xml");
- subscription.validateSubscription(subscribe);
- }
-
- public static class DummySubscription extends AbstractSubscription {
-
- public DummySubscription(String name) {
- super(name);
- }
-
- @Override
- protected void start() throws SubscribeCreationFailedFault {
- }
-
- @Override
- protected void pause() throws PauseFailedFault {
- }
-
- @Override
- protected void resume() throws ResumeFailedFault {
- }
-
- @Override
- protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault {
- }
-
- }
-
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.datatype.XMLGregorianCalendar;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
+import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
+import org.apache.servicemix.wsn.jaxws.PauseFailedFault;
+import org.apache.servicemix.wsn.jaxws.ResumeFailedFault;
+import org.apache.servicemix.wsn.jaxws.SubscribeCreationFailedFault;
+import org.apache.servicemix.wsn.jaxws.UnacceptableInitialTerminationTimeFault;
+import org.apache.servicemix.wsn.jaxws.UnacceptableTerminationTimeFault;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+
+public class SubscriptionTest extends TestCase {
+
+ private JAXBContext context;
+ private Unmarshaller unmarshaller;
+ private AbstractSubscription subscription;
+
+ protected void setUp() throws Exception {
+ context = JAXBContext.newInstance(Subscribe.class);
+ unmarshaller = context.createUnmarshaller();
+ subscription = new DummySubscription("mySubscription");
+ }
+
+ protected Subscribe getSubscription(String file) throws JAXBException, IOException {
+ InputStream is = getClass().getResourceAsStream(file);
+ Subscribe subscribe = (Subscribe) unmarshaller.unmarshal(is);
+ is.close();
+ return subscribe;
+ }
+
+ public void testWithNilITT() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-nil-itt.xml");
+ subscription.validateSubscription(subscribe);
+ }
+
+ public void testWithAbsoluteITT() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-abs-itt.xml");
+ try {
+ subscription.validateSubscription(subscribe);
+ fail("Invalid initial termination time used. Fault was expected.");
+ } catch (UnacceptableInitialTerminationTimeFault e) {
+ // OK
+ }
+ }
+
+ public void testWithEmptyITT() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-empty-itt.xml");
+ try {
+ subscription.validateSubscription(subscribe);
+ fail("Invalid initial termination time used. Fault was expected.");
+ } catch (UnacceptableInitialTerminationTimeFault e) {
+ // OK
+ }
+ }
+
+ public void testWithNoITT() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-no-itt.xml");
+ subscription.validateSubscription(subscribe);
+ }
+
+ public void testWithUseRaw() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-raw.xml");
+ subscription.validateSubscription(subscribe);
+ }
+
+ public void testWithProducerProperties() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-pp.xml");
+ try {
+ subscription.validateSubscription(subscribe);
+ fail("ProducerProperties used. Fault was expected.");
+ } catch (InvalidProducerPropertiesExpressionFault e) {
+ // OK
+ }
+ }
+
+ public void testWithNoTopic() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-no-topic.xml");
+ try {
+ subscription.validateSubscription(subscribe);
+ fail("ProducerProperties used. Fault was expected.");
+ } catch (InvalidFilterFault e) {
+ // OK
+ }
+ }
+
+ public void testWithEPR() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-epr.xml");
+ subscription.validateSubscription(subscribe);
+ }
+
+ public static class DummySubscription extends AbstractSubscription {
+
+ public DummySubscription(String name) {
+ super(name);
+ }
+
+ @Override
+ protected void start() throws SubscribeCreationFailedFault {
+ }
+
+ @Override
+ protected void pause() throws PauseFailedFault {
+ }
+
+ @Override
+ protected void resume() throws ResumeFailedFault {
+ }
+
+ @Override
+ protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault {
+ }
+
+ }
+
+}
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/SubscriptionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java Tue Feb 21 15:40:05 2006
@@ -1,392 +1,392 @@
-/*
- * Copyright 2005-2006 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.wsn.component;
-
-import java.io.File;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.net.URI;
-import java.net.URL;
-import java.util.List;
-
-import javax.jbi.JBIException;
-import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.NormalizedMessage;
-import javax.xml.bind.JAXBContext;
-import javax.xml.namespace.QName;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.transform.Source;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.servicemix.MessageExchangeListener;
-import org.apache.servicemix.components.util.ComponentSupport;
-import org.apache.servicemix.jbi.container.ActivationSpec;
-import org.apache.servicemix.jbi.container.JBIContainer;
-import org.apache.servicemix.jbi.jaxp.SourceTransformer;
-import org.apache.servicemix.jbi.jaxp.StringSource;
-import org.apache.servicemix.tck.Receiver;
-import org.apache.servicemix.tck.ReceiverComponent;
-import org.apache.servicemix.wsn.client.AbstractWSAClient;
-import org.apache.servicemix.wsn.client.CreatePullPoint;
-import org.apache.servicemix.wsn.client.NotificationBroker;
-import org.apache.servicemix.wsn.client.Publisher;
-import org.apache.servicemix.wsn.client.PullPoint;
-import org.apache.servicemix.wsn.client.Subscription;
-import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
-import org.oasis_open.docs.wsn.b_2.Notify;
-import org.oasis_open.docs.wsn.b_2.Subscribe;
-import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
-import org.oasis_open.docs.wsn.b_2.Unsubscribe;
-import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse;
-import org.w3._2005._08.addressing.AttributedURIType;
-import org.w3._2005._08.addressing.EndpointReferenceType;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.xml.sax.InputSource;
-
-public class WSNComponentTest extends TestCase {
-
- public static QName NOTIFICATION_BROKER = new QName("http://servicemix.org/wsnotification", "NotificationBroker");
-
- private JBIContainer jbi;
- private BrokerService jmsBroker;
- private NotificationBroker wsnBroker;
- private CreatePullPoint wsnCreatePullPoint;
- private WSNComponent wsnComponent;
-
- protected void setUp() throws Exception {
- jmsBroker = new BrokerService();
- jmsBroker.setPersistent(false);
- jmsBroker.addConnector("vm://localhost");
- jmsBroker.start();
-
- jbi = new JBIContainer();
- jbi.setEmbedded(true);
- jbi.init();
- jbi.start();
-
- wsnComponent = new WSNComponent();
- wsnComponent.setConnectionFactory(new ActiveMQConnectionFactory("vm://localhost"));
- ActivationSpec as = new ActivationSpec();
- as.setComponentName("servicemix-wsn2005");
- as.setComponent(wsnComponent);
- jbi.activateComponent(as);
-
- wsnBroker = new NotificationBroker(jbi);
- wsnCreatePullPoint = new CreatePullPoint(jbi);
- }
-
- protected void tearDown() throws Exception {
- if (jbi != null) {
- jbi.shutDown();
- }
- if (jmsBroker != null) {
- jmsBroker.stop();
- }
- }
-
- public void testInvalidSubscribription() throws Exception {
- try {
- wsnBroker.subscribe(null, null, null);
- fail("Expected an exception");
- } catch (JBIException e) {
- // ok
- }
- }
-
- public void testNotify() throws Exception {
- ReceiverComponent receiver = new ReceiverComponent();
- jbi.activateComponent(receiver, "receiver");
-
- EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT);
- wsnBroker.subscribe(consumer, "myTopic", null);
-
- wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
- // Wait for notification
- Thread.sleep(50);
-
- receiver.getMessageList().assertMessagesReceived(1);
- NormalizedMessage msg = (NormalizedMessage) receiver.getMessageList().getMessages().get(0);
- Node node = new SourceTransformer().toDOMNode(msg);
- assertEquals("Notify", node.getLocalName());
-
- // Wait for acks to be processed
- Thread.sleep(50);
- }
-
- public void testRawNotify() throws Exception {
- ReceiverComponent receiver = new ReceiverComponent();
- jbi.activateComponent(receiver, "receiver");
-
- EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT);
- wsnBroker.subscribe(consumer, "myTopic", null, true);
-
- wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
- // Wait for notification
- Thread.sleep(50);
-
- receiver.getMessageList().assertMessagesReceived(1);
- NormalizedMessage msg = (NormalizedMessage) receiver.getMessageList().getMessages().get(0);
- Node node = new SourceTransformer().toDOMNode(msg);
- assertEquals("hello", node.getLocalName());
-
- // Wait for acks to be processed
- Thread.sleep(50);
- }
-
- public void testUnsubscribe() throws Exception {
- PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
- Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
-
- wsnBroker.notify("myTopic", new Notify());
- // Wait for notification
- Thread.sleep(50);
-
- assertEquals(1, pullPoint.getMessages(0).size());
-
- subscription.unsubscribe();
-
- wsnBroker.notify("myTopic", new Notify());
- // Wait for notification
- Thread.sleep(50);
-
- assertEquals(0, pullPoint.getMessages(0).size());
-
- // Wait for acks to be processed
- Thread.sleep(50);
- }
-
- public void testPauseResume() throws Exception {
- PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
- Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
-
- wsnBroker.notify("myTopic", new Notify());
- // Wait for notification
- Thread.sleep(50);
-
- assertEquals(1, pullPoint.getMessages(0).size());
-
- subscription.pause();
-
- wsnBroker.notify("myTopic", new Notify());
- // Wait for notification
- Thread.sleep(50);
-
- assertEquals(0, pullPoint.getMessages(0).size());
-
- subscription.resume();
-
- wsnBroker.notify("myTopic", new Notify());
- // Wait for notification
- Thread.sleep(50);
-
- assertEquals(1, pullPoint.getMessages(0).size());
-
- // Wait for acks to be processed
- Thread.sleep(50);
- }
-
- public void testPull() throws Exception {
- PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
- wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
-
- wsnBroker.notify("myTopic", new Notify());
- // Wait for notification
- Thread.sleep(50);
-
- List<NotificationMessageHolderType> msgs = pullPoint.getMessages(0);
- assertNotNull(msgs);
- assertEquals(1, msgs.size());
-
- // Wait for acks to be processed
- Thread.sleep(50);
- }
-
- public void testPullWithFilter() throws Exception {
- PullPoint pullPoint1 = wsnCreatePullPoint.createPullPoint();
- PullPoint pullPoint2 = wsnCreatePullPoint.createPullPoint();
- wsnBroker.subscribe(pullPoint1.getEndpoint(), "myTopic", "@type = 'a'");
- wsnBroker.subscribe(pullPoint2.getEndpoint(), "myTopic", "@type = 'b'");
-
- wsnBroker.notify("myTopic", parse("<msg type='a'/>"));
- // Wait for notification
- Thread.sleep(500);
-
- assertEquals(1, pullPoint1.getMessages(0).size());
- assertEquals(0, pullPoint2.getMessages(0).size());
-
- wsnBroker.notify("myTopic", parse("<msg type='b'/>"));
- // Wait for notification
- Thread.sleep(500);
-
- assertEquals(0, pullPoint1.getMessages(0).size());
- assertEquals(1, pullPoint2.getMessages(0).size());
-
- wsnBroker.notify("myTopic", parse("<msg type='c'/>"));
- // Wait for notification
- Thread.sleep(500);
-
- assertEquals(0, pullPoint1.getMessages(0).size());
- assertEquals(0, pullPoint2.getMessages(0).size());
- }
-
- public void testDemandBasedPublisher() throws Exception {
- PublisherComponent publisherComponent = new PublisherComponent();
- jbi.activateComponent(publisherComponent, "publisher");
-
- Publisher publisher = wsnBroker.registerPublisher(
- AbstractWSAClient.createWSA(PublisherComponent.SERVICE.getNamespaceURI() + "/" + PublisherComponent.SERVICE.getLocalPart() + "/" + PublisherComponent.ENDPOINT),
- "myTopic", true);
-
- Thread.sleep(50);
- assertNull(publisherComponent.getSubscription());
-
- PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
- Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
-
- Thread.sleep(500);
- assertNotNull(publisherComponent.getSubscription());
-
- subscription.unsubscribe();
-
- Thread.sleep(500);
- assertNull(publisherComponent.getSubscription());
-
- publisher.destroy();
-
- Thread.sleep(50);
- }
-
- public void testDeployPullPoint() throws Exception {
- URL url = getClass().getClassLoader().getResource("pullpoint/pullpoint.xml");
- File path = new File(new URI(url.toString()));
- path = path.getParentFile();
- wsnComponent.getServiceUnitManager().deploy("pullpoint", path.getAbsolutePath());
-
- wsnComponent.getServiceUnitManager().start("pullpoint");
-
- wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
- PullPoint pullPoint = new PullPoint(AbstractWSAClient.createWSA("http://www.consumer.org/service/endpoint"),
- jbi);
- assertEquals(1, pullPoint.getMessages(0).size());
- }
-
- public void testDeploySubscription() throws Exception {
- URL url = getClass().getClassLoader().getResource("subscription/subscribe.xml");
- File path = new File(new URI(url.toString()));
- path = path.getParentFile();
- wsnComponent.getServiceUnitManager().deploy("subscription", path.getAbsolutePath());
-
- ActivationSpec consumer = new ActivationSpec();
- consumer.setService(new QName("http://www.consumer.org", "service"));
- consumer.setEndpoint("endpoint");
- Receiver receiver = new ReceiverComponent();
- consumer.setComponent(receiver);
- jbi.activateComponent(consumer);
-
- wsnComponent.getServiceUnitManager().start("subscription");
-
- wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
- // Wait for notification
- Thread.sleep(50);
- receiver.getMessageList().assertMessagesReceived(1);
- receiver.getMessageList().flushMessages();
-
- wsnComponent.getServiceUnitManager().stop("subscription");
-
- wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
- // Wait for notification
- Thread.sleep(50);
- assertEquals(0, receiver.getMessageList().flushMessages().size());
-
- wsnComponent.getServiceUnitManager().start("subscription");
-
- wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
- // Wait for notification
- Thread.sleep(50);
- receiver.getMessageList().assertMessagesReceived(1);
- receiver.getMessageList().flushMessages();
- }
-
- protected Element parse(String txt) throws Exception {
- DocumentBuilder builder = new SourceTransformer().createDocumentBuilder();
- InputSource is = new InputSource(new StringReader(txt));
- Document doc = builder.parse(is);
- return doc.getDocumentElement();
- }
-
- protected EndpointReferenceType createEPR(QName service, String endpoint) {
- EndpointReferenceType epr = new EndpointReferenceType();
- epr.setAddress(new AttributedURIType());
- epr.getAddress().setValue(service.getNamespaceURI() + "/" + service.getLocalPart() + "/" + endpoint);
- return epr;
- }
-
- public static class PublisherComponent extends ComponentSupport implements MessageExchangeListener {
- public static final QName SERVICE = new QName("http://servicemix.org/example", "publisher");
- public static final String ENDPOINT = "publisher";
- private Object subscription;
- public PublisherComponent() {
- super(SERVICE, ENDPOINT);
- }
- public Object getSubscription() {
- return subscription;
- }
- public void onMessageExchange(MessageExchange exchange) throws MessagingException {
- if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
- try {
- JAXBContext jaxbContext = JAXBContext.newInstance(Subscribe.class);
- Source src = exchange.getMessage("in").getContent();
- Object input = jaxbContext.createUnmarshaller().unmarshal(src);
- if (input instanceof Subscribe) {
- subscription = input;
- SubscribeResponse response = new SubscribeResponse();
- response.setSubscriptionReference(AbstractWSAClient.createWSA(PublisherComponent.SERVICE.getNamespaceURI() + "/" + PublisherComponent.SERVICE.getLocalPart() + "/" + PublisherComponent.ENDPOINT));
- StringWriter writer = new StringWriter();
- jaxbContext.createMarshaller().marshal(response, writer);
- NormalizedMessage out = exchange.createMessage();
- out.setContent(new StringSource(writer.toString()));
- exchange.setMessage(out, "out");
- send(exchange);
- } else if (input instanceof Unsubscribe) {
- subscription = null;
- UnsubscribeResponse response = new UnsubscribeResponse();
- StringWriter writer = new StringWriter();
- jaxbContext.createMarshaller().marshal(response, writer);
- NormalizedMessage out = exchange.createMessage();
- out.setContent(new StringSource(writer.toString()));
- exchange.setMessage(out, "out");
- send(exchange);
- } else {
- throw new Exception("Unkown request");
- }
- } catch (Exception e) {
- exchange.setError(e);
- send(exchange);
- }
- } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
- exchange.setStatus(ExchangeStatus.DONE);
- send(exchange);
- }
- }
- }
-}
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.wsn.component;
+
+import java.io.File;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.net.URI;
+import java.net.URL;
+import java.util.List;
+
+import javax.jbi.JBIException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.xml.bind.JAXBContext;
+import javax.xml.namespace.QName;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.transform.Source;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.components.util.ComponentSupport;
+import org.apache.servicemix.jbi.container.ActivationSpec;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.jbi.jaxp.SourceTransformer;
+import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.tck.Receiver;
+import org.apache.servicemix.tck.ReceiverComponent;
+import org.apache.servicemix.wsn.client.AbstractWSAClient;
+import org.apache.servicemix.wsn.client.CreatePullPoint;
+import org.apache.servicemix.wsn.client.NotificationBroker;
+import org.apache.servicemix.wsn.client.Publisher;
+import org.apache.servicemix.wsn.client.PullPoint;
+import org.apache.servicemix.wsn.client.Subscription;
+import org.oasis_open.docs.wsn.b_2.NotificationMessageHolderType;
+import org.oasis_open.docs.wsn.b_2.Notify;
+import org.oasis_open.docs.wsn.b_2.Subscribe;
+import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
+import org.oasis_open.docs.wsn.b_2.Unsubscribe;
+import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse;
+import org.w3._2005._08.addressing.AttributedURIType;
+import org.w3._2005._08.addressing.EndpointReferenceType;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.xml.sax.InputSource;
+
+public class WSNComponentTest extends TestCase {
+
+ public static QName NOTIFICATION_BROKER = new QName("http://servicemix.org/wsnotification", "NotificationBroker");
+
+ private JBIContainer jbi;
+ private BrokerService jmsBroker;
+ private NotificationBroker wsnBroker;
+ private CreatePullPoint wsnCreatePullPoint;
+ private WSNComponent wsnComponent;
+
+ protected void setUp() throws Exception {
+ jmsBroker = new BrokerService();
+ jmsBroker.setPersistent(false);
+ jmsBroker.addConnector("vm://localhost");
+ jmsBroker.start();
+
+ jbi = new JBIContainer();
+ jbi.setEmbedded(true);
+ jbi.init();
+ jbi.start();
+
+ wsnComponent = new WSNComponent();
+ wsnComponent.setConnectionFactory(new ActiveMQConnectionFactory("vm://localhost"));
+ ActivationSpec as = new ActivationSpec();
+ as.setComponentName("servicemix-wsn2005");
+ as.setComponent(wsnComponent);
+ jbi.activateComponent(as);
+
+ wsnBroker = new NotificationBroker(jbi);
+ wsnCreatePullPoint = new CreatePullPoint(jbi);
+ }
+
+ protected void tearDown() throws Exception {
+ if (jbi != null) {
+ jbi.shutDown();
+ }
+ if (jmsBroker != null) {
+ jmsBroker.stop();
+ }
+ }
+
+ public void testInvalidSubscribription() throws Exception {
+ try {
+ wsnBroker.subscribe(null, null, null);
+ fail("Expected an exception");
+ } catch (JBIException e) {
+ // ok
+ }
+ }
+
+ public void testNotify() throws Exception {
+ ReceiverComponent receiver = new ReceiverComponent();
+ jbi.activateComponent(receiver, "receiver");
+
+ EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT);
+ wsnBroker.subscribe(consumer, "myTopic", null);
+
+ wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
+ // Wait for notification
+ Thread.sleep(50);
+
+ receiver.getMessageList().assertMessagesReceived(1);
+ NormalizedMessage msg = (NormalizedMessage) receiver.getMessageList().getMessages().get(0);
+ Node node = new SourceTransformer().toDOMNode(msg);
+ assertEquals("Notify", node.getLocalName());
+
+ // Wait for acks to be processed
+ Thread.sleep(50);
+ }
+
+ public void testRawNotify() throws Exception {
+ ReceiverComponent receiver = new ReceiverComponent();
+ jbi.activateComponent(receiver, "receiver");
+
+ EndpointReferenceType consumer = createEPR(ReceiverComponent.SERVICE, ReceiverComponent.ENDPOINT);
+ wsnBroker.subscribe(consumer, "myTopic", null, true);
+
+ wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
+ // Wait for notification
+ Thread.sleep(50);
+
+ receiver.getMessageList().assertMessagesReceived(1);
+ NormalizedMessage msg = (NormalizedMessage) receiver.getMessageList().getMessages().get(0);
+ Node node = new SourceTransformer().toDOMNode(msg);
+ assertEquals("hello", node.getLocalName());
+
+ // Wait for acks to be processed
+ Thread.sleep(50);
+ }
+
+ public void testUnsubscribe() throws Exception {
+ PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
+ Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
+
+ wsnBroker.notify("myTopic", new Notify());
+ // Wait for notification
+ Thread.sleep(50);
+
+ assertEquals(1, pullPoint.getMessages(0).size());
+
+ subscription.unsubscribe();
+
+ wsnBroker.notify("myTopic", new Notify());
+ // Wait for notification
+ Thread.sleep(50);
+
+ assertEquals(0, pullPoint.getMessages(0).size());
+
+ // Wait for acks to be processed
+ Thread.sleep(50);
+ }
+
+ public void testPauseResume() throws Exception {
+ PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
+ Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
+
+ wsnBroker.notify("myTopic", new Notify());
+ // Wait for notification
+ Thread.sleep(50);
+
+ assertEquals(1, pullPoint.getMessages(0).size());
+
+ subscription.pause();
+
+ wsnBroker.notify("myTopic", new Notify());
+ // Wait for notification
+ Thread.sleep(50);
+
+ assertEquals(0, pullPoint.getMessages(0).size());
+
+ subscription.resume();
+
+ wsnBroker.notify("myTopic", new Notify());
+ // Wait for notification
+ Thread.sleep(50);
+
+ assertEquals(1, pullPoint.getMessages(0).size());
+
+ // Wait for acks to be processed
+ Thread.sleep(50);
+ }
+
+ public void testPull() throws Exception {
+ PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
+ wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
+
+ wsnBroker.notify("myTopic", new Notify());
+ // Wait for notification
+ Thread.sleep(50);
+
+ List<NotificationMessageHolderType> msgs = pullPoint.getMessages(0);
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+
+ // Wait for acks to be processed
+ Thread.sleep(50);
+ }
+
+ public void testPullWithFilter() throws Exception {
+ PullPoint pullPoint1 = wsnCreatePullPoint.createPullPoint();
+ PullPoint pullPoint2 = wsnCreatePullPoint.createPullPoint();
+ wsnBroker.subscribe(pullPoint1.getEndpoint(), "myTopic", "@type = 'a'");
+ wsnBroker.subscribe(pullPoint2.getEndpoint(), "myTopic", "@type = 'b'");
+
+ wsnBroker.notify("myTopic", parse("<msg type='a'/>"));
+ // Wait for notification
+ Thread.sleep(500);
+
+ assertEquals(1, pullPoint1.getMessages(0).size());
+ assertEquals(0, pullPoint2.getMessages(0).size());
+
+ wsnBroker.notify("myTopic", parse("<msg type='b'/>"));
+ // Wait for notification
+ Thread.sleep(500);
+
+ assertEquals(0, pullPoint1.getMessages(0).size());
+ assertEquals(1, pullPoint2.getMessages(0).size());
+
+ wsnBroker.notify("myTopic", parse("<msg type='c'/>"));
+ // Wait for notification
+ Thread.sleep(500);
+
+ assertEquals(0, pullPoint1.getMessages(0).size());
+ assertEquals(0, pullPoint2.getMessages(0).size());
+ }
+
+ public void testDemandBasedPublisher() throws Exception {
+ PublisherComponent publisherComponent = new PublisherComponent();
+ jbi.activateComponent(publisherComponent, "publisher");
+
+ Publisher publisher = wsnBroker.registerPublisher(
+ AbstractWSAClient.createWSA(PublisherComponent.SERVICE.getNamespaceURI() + "/" + PublisherComponent.SERVICE.getLocalPart() + "/" + PublisherComponent.ENDPOINT),
+ "myTopic", true);
+
+ Thread.sleep(50);
+ assertNull(publisherComponent.getSubscription());
+
+ PullPoint pullPoint = wsnCreatePullPoint.createPullPoint();
+ Subscription subscription = wsnBroker.subscribe(pullPoint.getEndpoint(), "myTopic", null);
+
+ Thread.sleep(500);
+ assertNotNull(publisherComponent.getSubscription());
+
+ subscription.unsubscribe();
+
+ Thread.sleep(500);
+ assertNull(publisherComponent.getSubscription());
+
+ publisher.destroy();
+
+ Thread.sleep(50);
+ }
+
+ public void testDeployPullPoint() throws Exception {
+ URL url = getClass().getClassLoader().getResource("pullpoint/pullpoint.xml");
+ File path = new File(new URI(url.toString()));
+ path = path.getParentFile();
+ wsnComponent.getServiceUnitManager().deploy("pullpoint", path.getAbsolutePath());
+
+ wsnComponent.getServiceUnitManager().start("pullpoint");
+
+ wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
+ PullPoint pullPoint = new PullPoint(AbstractWSAClient.createWSA("http://www.consumer.org/service/endpoint"),
+ jbi);
+ assertEquals(1, pullPoint.getMessages(0).size());
+ }
+
+ public void testDeploySubscription() throws Exception {
+ URL url = getClass().getClassLoader().getResource("subscription/subscribe.xml");
+ File path = new File(new URI(url.toString()));
+ path = path.getParentFile();
+ wsnComponent.getServiceUnitManager().deploy("subscription", path.getAbsolutePath());
+
+ ActivationSpec consumer = new ActivationSpec();
+ consumer.setService(new QName("http://www.consumer.org", "service"));
+ consumer.setEndpoint("endpoint");
+ Receiver receiver = new ReceiverComponent();
+ consumer.setComponent(receiver);
+ jbi.activateComponent(consumer);
+
+ wsnComponent.getServiceUnitManager().start("subscription");
+
+ wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
+ // Wait for notification
+ Thread.sleep(50);
+ receiver.getMessageList().assertMessagesReceived(1);
+ receiver.getMessageList().flushMessages();
+
+ wsnComponent.getServiceUnitManager().stop("subscription");
+
+ wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
+ // Wait for notification
+ Thread.sleep(50);
+ assertEquals(0, receiver.getMessageList().flushMessages().size());
+
+ wsnComponent.getServiceUnitManager().start("subscription");
+
+ wsnBroker.notify("myTopic", parse("<hello>world</hello>"));
+ // Wait for notification
+ Thread.sleep(50);
+ receiver.getMessageList().assertMessagesReceived(1);
+ receiver.getMessageList().flushMessages();
+ }
+
+ protected Element parse(String txt) throws Exception {
+ DocumentBuilder builder = new SourceTransformer().createDocumentBuilder();
+ InputSource is = new InputSource(new StringReader(txt));
+ Document doc = builder.parse(is);
+ return doc.getDocumentElement();
+ }
+
+ protected EndpointReferenceType createEPR(QName service, String endpoint) {
+ EndpointReferenceType epr = new EndpointReferenceType();
+ epr.setAddress(new AttributedURIType());
+ epr.getAddress().setValue(service.getNamespaceURI() + "/" + service.getLocalPart() + "/" + endpoint);
+ return epr;
+ }
+
+ public static class PublisherComponent extends ComponentSupport implements MessageExchangeListener {
+ public static final QName SERVICE = new QName("http://servicemix.org/example", "publisher");
+ public static final String ENDPOINT = "publisher";
+ private Object subscription;
+ public PublisherComponent() {
+ super(SERVICE, ENDPOINT);
+ }
+ public Object getSubscription() {
+ return subscription;
+ }
+ public void onMessageExchange(MessageExchange exchange) throws MessagingException {
+ if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
+ try {
+ JAXBContext jaxbContext = JAXBContext.newInstance(Subscribe.class);
+ Source src = exchange.getMessage("in").getContent();
+ Object input = jaxbContext.createUnmarshaller().unmarshal(src);
+ if (input instanceof Subscribe) {
+ subscription = input;
+ SubscribeResponse response = new SubscribeResponse();
+ response.setSubscriptionReference(AbstractWSAClient.createWSA(PublisherComponent.SERVICE.getNamespaceURI() + "/" + PublisherComponent.SERVICE.getLocalPart() + "/" + PublisherComponent.ENDPOINT));
+ StringWriter writer = new StringWriter();
+ jaxbContext.createMarshaller().marshal(response, writer);
+ NormalizedMessage out = exchange.createMessage();
+ out.setContent(new StringSource(writer.toString()));
+ exchange.setMessage(out, "out");
+ send(exchange);
+ } else if (input instanceof Unsubscribe) {
+ subscription = null;
+ UnsubscribeResponse response = new UnsubscribeResponse();
+ StringWriter writer = new StringWriter();
+ jaxbContext.createMarshaller().marshal(response, writer);
+ NormalizedMessage out = exchange.createMessage();
+ out.setContent(new StringSource(writer.toString()));
+ exchange.setMessage(out, "out");
+ send(exchange);
+ } else {
+ throw new Exception("Unkown request");
+ }
+ } catch (Exception e) {
+ exchange.setError(e);
+ send(exchange);
+ }
+ } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
+ exchange.setStatus(ExchangeStatus.DONE);
+ send(exchange);
+ }
+ }
+ }
+}
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j-tests.properties
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j-tests.properties?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j-tests.properties (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j-tests.properties Tue Feb 21 15:40:05 2006
@@ -1,21 +1,21 @@
-#
-# The logging properties used during tests..
-#
-log4j.rootLogger=DEBUG, out
-
-log4j.logger.org.apache.activemq=INFO
-log4j.logger.org.apache.activemq.spring=WARN
-log4j.logger.org.apache.activemq.store.journal=INFO
-log4j.logger.org.activeio.journal=INFO
-
-# CONSOLE appender not used by default
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-
-# File appender
-log4j.appender.out=org.apache.log4j.FileAppender
-log4j.appender.out.layout=org.apache.log4j.PatternLayout
-log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-log4j.appender.out.file=target/servicemix-test.log
-log4j.appender.out.append=true
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=DEBUG, out
+
+log4j.logger.org.apache.activemq=INFO
+log4j.logger.org.apache.activemq.spring=WARN
+log4j.logger.org.apache.activemq.store.journal=INFO
+log4j.logger.org.activeio.journal=INFO
+
+# CONSOLE appender not used by default
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.out=org.apache.log4j.FileAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.out.file=target/servicemix-test.log
+log4j.appender.out.append=true
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j-tests.properties
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j.properties?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j.properties (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j.properties Tue Feb 21 15:40:05 2006
@@ -1,21 +1,21 @@
-#
-# The logging properties used during tests..
-#
-log4j.rootLogger=DEBUG, stdout
-
-log4j.logger.org.apache.activemq=INFO
-log4j.logger.org.apache.activemq.spring=WARN
-log4j.logger.org.apache.activemq.store.journal=INFO
-log4j.logger.org.activeio.journal=INFO
-
-# CONSOLE appender not used by default
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-
-# File appender
-log4j.appender.out=org.apache.log4j.FileAppender
-log4j.appender.out.layout=org.apache.log4j.PatternLayout
-log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
-log4j.appender.out.file=target/servicemix-test.log
-log4j.appender.out.append=true
+#
+# The logging properties used during tests..
+#
+log4j.rootLogger=DEBUG, stdout
+
+log4j.logger.org.apache.activemq=INFO
+log4j.logger.org.apache.activemq.spring=WARN
+log4j.logger.org.apache.activemq.store.journal=INFO
+log4j.logger.org.activeio.journal=INFO
+
+# CONSOLE appender not used by default
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+# File appender
+log4j.appender.out=org.apache.log4j.FileAppender
+log4j.appender.out.layout=org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
+log4j.appender.out.file=target/servicemix-test.log
+log4j.appender.out.append=true
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/log4j.properties
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/org/apache/servicemix/wsn/subscribe-abs-itt.xml
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/org/apache/servicemix/wsn/subscribe-abs-itt.xml?rev=379627&r1=379626&r2=379627&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/org/apache/servicemix/wsn/subscribe-abs-itt.xml (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/org/apache/servicemix/wsn/subscribe-abs-itt.xml Tue Feb 21 15:40:05 2006
@@ -1,23 +1,23 @@
-<?xml version="1.0"?>
-<wsnt:Subscribe
- xmlns:wsnt="http://docs.oasis-open.org/wsn/b-2"
- xmlns:wsa="http://www.w3.org/2005/08/addressing"
- xmlns:ncex="http://www.consumer.org"
- xmlns:npex="http://www.producer.org">
- <wsnt:ConsumerReference>
- <wsa:Address>
- http://www.consumer.org/ConsumerEndpoint
- </wsa:Address>
- </wsnt:ConsumerReference>
- <wsnt:Filter>
- <wsnt:TopicExpression Dialect="http://docs.oasis-open.org/wsn/t-1/TopicExpression/Simple">
- npex:SomeTopic
- </wsnt:TopicExpression>
- <wsnt:MessageContent Dialect="http://www.w3.org/TR/1999/REC-xpath-19991116">
- boolean(ncex:Producer="15")
- </wsnt:MessageContent>
- </wsnt:Filter>
- <wsnt:InitialTerminationTime>
- 2007-12-25T00:00:00.00000Z
- </wsnt:InitialTerminationTime>
+<?xml version="1.0"?>
+<wsnt:Subscribe
+ xmlns:wsnt="http://docs.oasis-open.org/wsn/b-2"
+ xmlns:wsa="http://www.w3.org/2005/08/addressing"
+ xmlns:ncex="http://www.consumer.org"
+ xmlns:npex="http://www.producer.org">
+ <wsnt:ConsumerReference>
+ <wsa:Address>
+ http://www.consumer.org/ConsumerEndpoint
+ </wsa:Address>
+ </wsnt:ConsumerReference>
+ <wsnt:Filter>
+ <wsnt:TopicExpression Dialect="http://docs.oasis-open.org/wsn/t-1/TopicExpression/Simple">
+ npex:SomeTopic
+ </wsnt:TopicExpression>
+ <wsnt:MessageContent Dialect="http://www.w3.org/TR/1999/REC-xpath-19991116">
+ boolean(ncex:Producer="15")
+ </wsnt:MessageContent>
+ </wsnt:Filter>
+ <wsnt:InitialTerminationTime>
+ 2007-12-25T00:00:00.00000Z
+ </wsnt:InitialTerminationTime>
</wsnt:Subscribe>
Propchange: incubator/servicemix/trunk/servicemix-wsn2005/src/test/resources/org/apache/servicemix/wsn/subscribe-abs-itt.xml
------------------------------------------------------------------------------
svn:eol-style = native