You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2007/04/08 23:19:32 UTC
svn commit: r526601 [3/3] - in
/incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src:
main/java/org/apache/servicemix/wsn/
main/java/org/apache/servicemix/wsn/client/
main/java/org/apache/servicemix/wsn/component/ main/java/org/...
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsPullPoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsPullPoint.java?view=diff&rev=526601&r1=526600&r2=526601
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsPullPoint.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsPullPoint.java Sun Apr 8 14:19:30 2007
@@ -43,105 +43,111 @@
public class JmsPullPoint extends AbstractPullPoint {
- private static Log log = LogFactory.getLog(JmsPullPoint.class);
-
- private JAXBContext jaxbContext;
- private Connection connection;
- private Session session;
- private Queue queue;
- private MessageProducer producer;
- private MessageConsumer consumer;
-
- public JmsPullPoint(String name) {
- super(name);
- try {
- jaxbContext = JAXBContext.newInstance(Notify.class);
- } catch (JAXBException e) {
- throw new RuntimeException("Could not create PullEndpoint", e);
- }
- }
-
- protected void initSession() throws JMSException {
- if (session == null) {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- queue = session.createQueue(getName());
- producer = session.createProducer(queue);
- consumer = session.createConsumer(queue);
- }
- }
-
- @Override
- protected synchronized void store(NotificationMessageHolderType messageHolder) {
- try {
- initSession();
+ private static Log log = LogFactory.getLog(JmsPullPoint.class);
+
+ private JAXBContext jaxbContext;
+
+ private Connection connection;
+
+ private Session session;
+
+ private Queue queue;
+
+ private MessageProducer producer;
+
+ private MessageConsumer consumer;
+
+ public JmsPullPoint(String name) {
+ super(name);
+ try {
+ jaxbContext = JAXBContext.newInstance(Notify.class);
+ } catch (JAXBException e) {
+ throw new RuntimeException("Could not create PullEndpoint", e);
+ }
+ }
+
+ protected void initSession() throws JMSException {
+ if (session == null) {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = session.createQueue(getName());
+ producer = session.createProducer(queue);
+ consumer = session.createConsumer(queue);
+ }
+ }
+
+ @Override
+ protected synchronized void store(NotificationMessageHolderType messageHolder) {
+ try {
+ initSession();
Notify notify = new Notify();
notify.getNotificationMessage().add(messageHolder);
StringWriter writer = new StringWriter();
jaxbContext.createMarshaller().marshal(notify, writer);
Message message = session.createTextMessage(writer.toString());
producer.send(message);
- } catch (JMSException e) {
- log.warn("Error storing message", e);
- if (session != null) {
- try {
- session.close();
- } catch (JMSException inner) {
- log.debug("Error closing session", inner);
- } finally {
- session = null;
- }
- }
- } catch (JAXBException e) {
- log.warn("Error storing message", e);
- }
- }
-
- @Override
- protected synchronized List<NotificationMessageHolderType> getMessages(int max) throws ResourceUnknownFault, UnableToGetMessagesFault {
- Session session = null;
- try {
- if (max == 0) {
- max = 256;
- }
- initSession();
- List<NotificationMessageHolderType> messages = new ArrayList<NotificationMessageHolderType>();
- for (int i = 0; i < max; i++) {
- Message msg = consumer.receiveNoWait();
- if (msg == null) {
- break;
- }
- TextMessage txtMsg = (TextMessage) msg;
- StringReader reader = new StringReader(txtMsg.getText());
- Notify notify = (Notify) jaxbContext.createUnmarshaller().unmarshal(reader);
- messages.addAll(notify.getNotificationMessage());
- }
- return messages;
- } catch (JMSException e) {
- log.info("Error retrieving messages", e);
- if (session != null) {
- try {
- session.close();
- } catch (JMSException inner) {
- log.debug("Error closing session", inner);
- } finally {
- session = null;
- }
- }
+ } catch (JMSException e) {
+ log.warn("Error storing message", e);
+ if (session != null) {
+ try {
+ session.close();
+ } catch (JMSException inner) {
+ log.debug("Error closing session", inner);
+ } finally {
+ session = null;
+ }
+ }
+ } catch (JAXBException e) {
+ log.warn("Error storing message", e);
+ }
+ }
+
+ @Override
+ protected synchronized List<NotificationMessageHolderType> getMessages(int max) throws ResourceUnknownFault,
+ UnableToGetMessagesFault {
+ Session jmsSession = null;
+ try {
+ if (max == 0) {
+ max = 256;
+ }
+ initSession();
+ List<NotificationMessageHolderType> messages = new ArrayList<NotificationMessageHolderType>();
+ for (int i = 0; i < max; i++) {
+ Message msg = consumer.receiveNoWait();
+ if (msg == null) {
+ break;
+ }
+ TextMessage txtMsg = (TextMessage) msg;
+ StringReader reader = new StringReader(txtMsg.getText());
+ Notify notify = (Notify) jaxbContext.createUnmarshaller().unmarshal(reader);
+ messages.addAll(notify.getNotificationMessage());
+ }
+ return messages;
+ } catch (JMSException e) {
+ log.info("Error retrieving messages", e);
+ if (jmsSession != null) {
+ try {
+ jmsSession.close();
+ } catch (JMSException inner) {
+ log.debug("Error closing session", inner);
+ } finally {
+ jmsSession = null;
+ }
+ }
UnableToGetMessagesFaultType fault = new UnableToGetMessagesFaultType();
- throw new UnableToGetMessagesFault("Unable to retrieve messages", fault, e);
- } catch (JAXBException e) {
- log.info("Error retrieving messages", e);
+ throw new UnableToGetMessagesFault("Unable to retrieve messages", fault, e);
+ } catch (JAXBException e) {
+ log.info("Error retrieving messages", e);
UnableToGetMessagesFaultType fault = new UnableToGetMessagesFaultType();
- throw new UnableToGetMessagesFault("Unable to retrieve messages", fault, e);
- }
- }
-
- public Connection getConnection() {
- return connection;
- }
-
- public void setConnection(Connection connection) {
- this.connection = connection;
- }
+ throw new UnableToGetMessagesFault("Unable to retrieve messages", fault, e);
+ }
+ }
+
+ public Connection getConnection() {
+ return connection;
+ }
+
+ public void setConnection(Connection connection) {
+ this.connection = connection;
+ }
}
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsSubscription.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsSubscription.java?view=diff&rev=526601&r1=526600&r2=526601
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsSubscription.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsSubscription.java Sun Apr 8 14:19:30 2007
@@ -34,6 +34,10 @@
import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.xml.sax.InputSource;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.wsn.AbstractSubscription;
@@ -56,159 +60,160 @@
import org.oasis_open.docs.wsn.b_2.SubscribeCreationFailedFaultType;
import org.oasis_open.docs.wsn.b_2.UnableToDestroySubscriptionFaultType;
import org.oasis_open.docs.wsn.b_2.UnacceptableTerminationTimeFaultType;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.xml.sax.InputSource;
public abstract class JmsSubscription extends AbstractSubscription implements MessageListener {
- private static Log log = LogFactory.getLog(JmsSubscription.class);
-
- private Connection connection;
- private Session session;
+ private static Log log = LogFactory.getLog(JmsSubscription.class);
+
+ private Connection connection;
+
+ private Session session;
+
private JmsTopicExpressionConverter topicConverter;
+
private Topic jmsTopic;
-
- public JmsSubscription(String name) {
- super(name);
- topicConverter = new JmsTopicExpressionConverter();
- }
-
- protected void start() throws SubscribeCreationFailedFault {
- try {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(jmsTopic);
+
+ public JmsSubscription(String name) {
+ super(name);
+ topicConverter = new JmsTopicExpressionConverter();
+ }
+
+ protected void start() throws SubscribeCreationFailedFault {
+ try {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(jmsTopic);
consumer.setMessageListener(this);
- } catch (JMSException e) {
- SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
- throw new SubscribeCreationFailedFault("Error starting subscription", fault, e);
- }
- }
-
- @Override
- protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
- super.validateSubscription(subscribeRequest);
- try {
- jmsTopic = topicConverter.toActiveMQTopic(topic);
- } catch (InvalidTopicException e) {
- InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
- throw new InvalidTopicExpressionFault(e.getMessage(), fault);
- }
- }
-
- @Override
- protected void pause() throws PauseFailedFault {
- if (session == null) {
- PauseFailedFaultType fault = new PauseFailedFaultType();
- throw new PauseFailedFault("Subscription is already paused", fault);
- } else {
- try {
- session.close();
- } catch (JMSException e) {
- PauseFailedFaultType fault = new PauseFailedFaultType();
- throw new PauseFailedFault("Error pausing subscription", fault, e);
- } finally {
- session = null;
- }
- }
- }
-
- @Override
- protected void resume() throws ResumeFailedFault {
- if (session != null) {
- ResumeFailedFaultType fault = new ResumeFailedFaultType();
- throw new ResumeFailedFault("Subscription is already running", fault);
- } else {
- try {
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(jmsTopic);
- consumer.setMessageListener(this);
- } catch (JMSException e) {
- ResumeFailedFaultType fault = new ResumeFailedFaultType();
- throw new ResumeFailedFault("Error resuming subscription", fault, e);
- }
- }
- }
-
- @Override
- protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault {
- UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
- throw new UnacceptableTerminationTimeFault(
- "TerminationTime is not supported",
- fault);
- }
-
- @Override
- protected void unsubscribe() throws UnableToDestroySubscriptionFault {
- super.unsubscribe();
- if (session != null) {
- try {
- session.close();
- } catch (JMSException e) {
- UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
- throw new UnableToDestroySubscriptionFault("Unable to unsubscribe", fault, e);
- } finally {
- session = null;
- }
- }
- }
-
- public Connection getConnection() {
- return connection;
- }
-
- public void setConnection(Connection connection) {
- this.connection = connection;
- }
-
- public void onMessage(Message jmsMessage) {
- try {
- TextMessage text = (TextMessage) jmsMessage;
- DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
- factory.setNamespaceAware(true);
- Document doc = factory.newDocumentBuilder().parse(new InputSource(new StringReader(text.getText())));
- Element root = doc.getDocumentElement();
- Element holder = (Element) root.getElementsByTagNameNS(WSN_URI, "NotificationMessage").item(0);
- Element message = (Element) holder.getElementsByTagNameNS(WSN_URI, "Message").item(0);
- Element content = null;
- for (int i = 0; i < message.getChildNodes().getLength(); i++) {
- if (message.getChildNodes().item(i) instanceof Element) {
- content = (Element) message.getChildNodes().item(i);
- break;
- }
- }
- boolean match = doFilter(content);
- if (match) {
- if (useRaw) {
- doNotify(content);
- } else {
- doNotify(root);
- }
- }
- } catch (Exception e) {
- log.warn("Error notifying consumer", e);
- }
- }
-
- protected boolean doFilter(Element content) {
- if (contentFilter != null) {
- if (!contentFilter.getDialect().equals(XPATH1_URI)) {
- throw new IllegalStateException("Unsupported dialect: " + contentFilter.getDialect());
- }
- try {
- XPathFactory xpfactory = XPathFactory.newInstance();
- XPath xpath = xpfactory.newXPath();
- XPathExpression exp = xpath.compile(contentFilter.getContent().get(0).toString());
- Boolean ret = (Boolean) exp.evaluate(content, XPathConstants.BOOLEAN);
- return ret.booleanValue();
- } catch (XPathExpressionException e) {
- log.warn("Could not filter notification", e);
- }
- return false;
- }
- return true;
- }
-
- protected abstract void doNotify(Element content);
+ } catch (JMSException e) {
+ SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
+ throw new SubscribeCreationFailedFault("Error starting subscription", fault, e);
+ }
+ }
+
+ @Override
+ protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault,
+ InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault,
+ InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault,
+ TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
+ super.validateSubscription(subscribeRequest);
+ try {
+ jmsTopic = topicConverter.toActiveMQTopic(topic);
+ } catch (InvalidTopicException e) {
+ InvalidTopicExpressionFaultType fault = new InvalidTopicExpressionFaultType();
+ throw new InvalidTopicExpressionFault(e.getMessage(), fault);
+ }
+ }
+
+ @Override
+ protected void pause() throws PauseFailedFault {
+ if (session == null) {
+ PauseFailedFaultType fault = new PauseFailedFaultType();
+ throw new PauseFailedFault("Subscription is already paused", fault);
+ } else {
+ try {
+ session.close();
+ } catch (JMSException e) {
+ PauseFailedFaultType fault = new PauseFailedFaultType();
+ throw new PauseFailedFault("Error pausing subscription", fault, e);
+ } finally {
+ session = null;
+ }
+ }
+ }
+
+ @Override
+ protected void resume() throws ResumeFailedFault {
+ if (session != null) {
+ ResumeFailedFaultType fault = new ResumeFailedFaultType();
+ throw new ResumeFailedFault("Subscription is already running", fault);
+ } else {
+ try {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(jmsTopic);
+ consumer.setMessageListener(this);
+ } catch (JMSException e) {
+ ResumeFailedFaultType fault = new ResumeFailedFaultType();
+ throw new ResumeFailedFault("Error resuming subscription", fault, e);
+ }
+ }
+ }
+
+ @Override
+ protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault {
+ UnacceptableTerminationTimeFaultType fault = new UnacceptableTerminationTimeFaultType();
+ throw new UnacceptableTerminationTimeFault("TerminationTime is not supported", fault);
+ }
+
+ @Override
+ protected void unsubscribe() throws UnableToDestroySubscriptionFault {
+ super.unsubscribe();
+ if (session != null) {
+ try {
+ session.close();
+ } catch (JMSException e) {
+ UnableToDestroySubscriptionFaultType fault = new UnableToDestroySubscriptionFaultType();
+ throw new UnableToDestroySubscriptionFault("Unable to unsubscribe", fault, e);
+ } finally {
+ session = null;
+ }
+ }
+ }
+
+ public Connection getConnection() {
+ return connection;
+ }
+
+ public void setConnection(Connection connection) {
+ this.connection = connection;
+ }
+
+ public void onMessage(Message jmsMessage) {
+ try {
+ TextMessage text = (TextMessage) jmsMessage;
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ factory.setNamespaceAware(true);
+ Document doc = factory.newDocumentBuilder().parse(new InputSource(new StringReader(text.getText())));
+ Element root = doc.getDocumentElement();
+ Element holder = (Element) root.getElementsByTagNameNS(WSN_URI, "NotificationMessage").item(0);
+ Element message = (Element) holder.getElementsByTagNameNS(WSN_URI, "Message").item(0);
+ Element content = null;
+ for (int i = 0; i < message.getChildNodes().getLength(); i++) {
+ if (message.getChildNodes().item(i) instanceof Element) {
+ content = (Element) message.getChildNodes().item(i);
+ break;
+ }
+ }
+ boolean match = doFilter(content);
+ if (match) {
+ if (useRaw) {
+ doNotify(content);
+ } else {
+ doNotify(root);
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Error notifying consumer", e);
+ }
+ }
+
+ protected boolean doFilter(Element content) {
+ if (contentFilter != null) {
+ if (!contentFilter.getDialect().equals(XPATH1_URI)) {
+ throw new IllegalStateException("Unsupported dialect: " + contentFilter.getDialect());
+ }
+ try {
+ XPathFactory xpfactory = XPathFactory.newInstance();
+ XPath xpath = xpfactory.newXPath();
+ XPathExpression exp = xpath.compile(contentFilter.getContent().get(0).toString());
+ Boolean ret = (Boolean) exp.evaluate(content, XPathConstants.BOOLEAN);
+ return ret.booleanValue();
+ } catch (XPathExpressionException e) {
+ log.warn("Could not filter notification", e);
+ }
+ return false;
+ }
+ return true;
+ }
+
+ protected abstract void doNotify(Element content);
}
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsTopicExpressionConverter.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsTopicExpressionConverter.java?view=diff&rev=526601&r1=526600&r2=526601
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsTopicExpressionConverter.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jms/JmsTopicExpressionConverter.java Sun Apr 8 14:19:30 2007
@@ -69,9 +69,8 @@
}
}
throw new InvalidTopicException("No topic name available topic: " + topic);
- }
- else {
- throw new InvalidTopicException("Topic dialect: " + dialect + " not supported");
+ } else {
+ throw new InvalidTopicException("Topic dialect: " + dialect + " not supported");
}
}
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/CreatePullPointFactoryBean.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/CreatePullPointFactoryBean.java?view=diff&rev=526601&r1=526600&r2=526601
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/CreatePullPointFactoryBean.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/CreatePullPointFactoryBean.java Sun Apr 8 14:19:30 2007
@@ -18,12 +18,13 @@
import javax.xml.parsers.DocumentBuilderFactory;
-import org.oasis_open.docs.wsn.b_2.CreatePullPoint;
-import org.springframework.beans.factory.FactoryBean;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Text;
+import org.oasis_open.docs.wsn.b_2.CreatePullPoint;
+import org.springframework.beans.factory.FactoryBean;
+
/**
*
* @author gnodet
@@ -33,7 +34,7 @@
public class CreatePullPointFactoryBean implements FactoryBean {
private String address;
-
+
/**
* @return Returns the address.
*/
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/PublisherComponent.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/PublisherComponent.java?view=diff&rev=526601&r1=526600&r2=526601
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/PublisherComponent.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/PublisherComponent.java Sun Apr 8 14:19:30 2007
@@ -26,6 +26,8 @@
import javax.xml.bind.JAXBContext;
import javax.xml.transform.Source;
+import org.w3c.dom.Element;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.MessageExchangeListener;
@@ -38,7 +40,6 @@
import org.oasis_open.docs.wsn.b_2.SubscribeResponse;
import org.oasis_open.docs.wsn.b_2.Unsubscribe;
import org.oasis_open.docs.wsn.b_2.UnsubscribeResponse;
-import org.w3c.dom.Element;
/**
*
@@ -48,14 +49,18 @@
*/
public class PublisherComponent extends ComponentSupport implements MessageExchangeListener {
- private static final Log log = LogFactory.getLog(PublisherComponent.class);
-
+ private static final Log LOG = LogFactory.getLog(PublisherComponent.class);
+
private NotificationBroker wsnBroker;
+
private String topic;
+
private boolean demand;
+
private String subscriptionEndpoint = "subscription";
+
private Subscribe subscription;
-
+
/**
* @return Returns the demand.
*/
@@ -98,7 +103,7 @@
super.init();
getContext().activateEndpoint(getService(), subscriptionEndpoint);
}
-
+
/* (non-Javadoc)
* @see javax.jbi.management.LifeCycleMBean#start()
*/
@@ -107,24 +112,23 @@
public void run() {
try {
wsnBroker = new NotificationBroker(getContext());
- String wsaAddress = getService().getNamespaceURI() + "/" + getService().getLocalPart() + "/" + subscriptionEndpoint;
- wsnBroker.registerPublisher(AbstractWSAClient.createWSA(wsaAddress),
- topic,
- demand);
+ String wsaAddress = getService().getNamespaceURI() + "/" + getService().getLocalPart() + "/"
+ + subscriptionEndpoint;
+ wsnBroker.registerPublisher(AbstractWSAClient.createWSA(wsaAddress), topic, demand);
} catch (Exception e) {
- log.error("Could not create wsn client", e);
+ LOG.error("Could not create wsn client", e);
}
}
- }.start();
+ } .start();
}
-
+
/* (non-Javadoc)
* @see javax.jbi.management.LifeCycleMBean#shutDown()
*/
public void shutDown() throws JBIException {
super.shutDown();
}
-
+
/* (non-Javadoc)
* @see org.apache.servicemix.MessageExchangeListener#onMessageExchange(javax.jbi.messaging.MessageExchange)
*/
@@ -141,7 +145,8 @@
if (input instanceof Subscribe) {
subscription = (Subscribe) input;
SubscribeResponse response = new SubscribeResponse();
- String wsaAddress = getService().getNamespaceURI() + "/" + getService().getLocalPart() + "/" + subscriptionEndpoint;
+ String wsaAddress = getService().getNamespaceURI() + "/" + getService().getLocalPart() + "/"
+ + subscriptionEndpoint;
response.setSubscriptionReference(AbstractWSAClient.createWSA(wsaAddress));
StringWriter writer = new StringWriter();
jaxbContext.createMarshaller().marshal(response, writer);
@@ -164,7 +169,7 @@
} catch (Exception e) {
fail(exchange, e);
}
- // This is a notification to publish
+ // This is a notification to publish
} else {
try {
if (!demand || subscription != null) {
@@ -172,7 +177,7 @@
wsnBroker.notify(topic, elem);
done(exchange);
} else {
- log.info("Ingore notification as the publisher is no subscribers");
+ LOG.info("Ingore notification as the publisher is no subscribers");
}
} catch (Exception e) {
fail(exchange, e);
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/RegisterPublisherFactoryBean.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/RegisterPublisherFactoryBean.java?view=diff&rev=526601&r1=526600&r2=526601
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/RegisterPublisherFactoryBean.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/RegisterPublisherFactoryBean.java Sun Apr 8 14:19:30 2007
@@ -30,9 +30,11 @@
public class RegisterPublisherFactoryBean implements FactoryBean {
private String publisher;
+
private String topic;
+
private boolean demand;
-
+
/**
* @return Returns the demand.
*/
@@ -86,7 +88,7 @@
topicExp.getContent().add(topic);
registerPublisher.getTopic().add(topicExp);
}
- registerPublisher.setDemand(new Boolean(demand));
+ registerPublisher.setDemand(Boolean.valueOf(demand));
return registerPublisher;
}
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/SubscribeFactoryBean.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/SubscribeFactoryBean.java?view=diff&rev=526601&r1=526600&r2=526601
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/SubscribeFactoryBean.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/SubscribeFactoryBean.java Sun Apr 8 14:19:30 2007
@@ -36,10 +36,13 @@
public class SubscribeFactoryBean implements FactoryBean {
private String consumer;
+
private String topic;
+
private String xpath;
+
private boolean raw;
-
+
/**
* @return Returns the consumer.
*/
@@ -48,7 +51,8 @@
}
/**
- * @param consumer The consumer to set.
+ * @param consumer
+ * The consumer to set.
*/
public void setConsumer(String consumer) {
this.consumer = consumer;
@@ -62,7 +66,8 @@
}
/**
- * @param topic The topic to set.
+ * @param topic
+ * The topic to set.
*/
public void setTopic(String topic) {
this.topic = topic;
@@ -76,7 +81,8 @@
}
/**
- * @param xpath The xpath to set.
+ * @param xpath
+ * The xpath to set.
*/
public void setXpath(String xpath) {
this.xpath = xpath;
@@ -90,13 +96,16 @@
}
/**
- * @param raw The raw to set.
+ * @param raw
+ * The raw to set.
*/
public void setRaw(boolean raw) {
this.raw = raw;
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.springframework.beans.factory.FactoryBean#getObject()
*/
public Object getObject() throws Exception {
@@ -106,13 +115,17 @@
if (topic != null) {
TopicExpressionType topicExp = new TopicExpressionType();
topicExp.getContent().add(topic);
- subscribe.getFilter().getAny().add(new JAXBElement<TopicExpressionType>(AbstractSubscription.QNAME_TOPIC_EXPRESSION, TopicExpressionType.class, topicExp));
+ subscribe.getFilter().getAny().add(
+ new JAXBElement<TopicExpressionType>(AbstractSubscription.QNAME_TOPIC_EXPRESSION,
+ TopicExpressionType.class, topicExp));
}
if (xpath != null) {
QueryExpressionType xpathExp = new QueryExpressionType();
xpathExp.setDialect(AbstractSubscription.XPATH1_URI);
xpathExp.getContent().add(xpath);
- subscribe.getFilter().getAny().add(new JAXBElement<QueryExpressionType>(AbstractSubscription.QNAME_MESSAGE_CONTENT, QueryExpressionType.class, xpathExp));
+ subscribe.getFilter().getAny().add(
+ new JAXBElement<QueryExpressionType>(AbstractSubscription.QNAME_MESSAGE_CONTENT,
+ QueryExpressionType.class, xpathExp));
}
if (raw) {
subscribe.setSubscriptionPolicy(new Subscribe.SubscriptionPolicy());
@@ -121,14 +134,18 @@
return subscribe;
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.springframework.beans.factory.FactoryBean#getObjectType()
*/
public Class getObjectType() {
return Subscribe.class;
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.springframework.beans.factory.FactoryBean#isSingleton()
*/
public boolean isSingleton() {
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/WSNSpringComponent.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/WSNSpringComponent.java?view=diff&rev=526601&r1=526600&r2=526601
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/WSNSpringComponent.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/spring/WSNSpringComponent.java Sun Apr 8 14:19:30 2007
@@ -36,9 +36,9 @@
public class WSNSpringComponent extends BaseComponent {
private Resource[] resources;
-
- private Object[] requests;
-
+
+ private Object[] requests;
+
/**
* @return Returns the endpoints.
*/
@@ -52,7 +52,7 @@
public void setResources(Resource[] endpoints) {
this.resources = endpoints;
}
-
+
/**
* @return Returns the requests.
*/
@@ -80,7 +80,7 @@
public void setConnectionFactory(ConnectionFactory connectionFactory) {
((WSNLifeCycle) lifeCycle).setConnectionFactory(connectionFactory);
}
-
+
/* (non-Javadoc)
* @see org.servicemix.common.BaseComponent#createLifeCycle()
*/
@@ -91,11 +91,11 @@
public class LifeCycle extends WSNLifeCycle {
protected ServiceUnit su;
-
+
public LifeCycle() {
super(WSNSpringComponent.this);
}
-
+
/* (non-Javadoc)
* @see org.servicemix.common.BaseLifeCycle#doInit()
*/
@@ -128,7 +128,7 @@
super.doStart();
su.start();
}
-
+
/* (non-Javadoc)
* @see org.servicemix.common.BaseLifeCycle#doStop()
*/
@@ -136,7 +136,7 @@
su.stop();
super.doStop();
}
-
+
/* (non-Javadoc)
* @see org.servicemix.common.BaseLifeCycle#doShutDown()
*/
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/DummySubscription.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/DummySubscription.java?view=diff&rev=526601&r1=526600&r2=526601
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/DummySubscription.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/DummySubscription.java Sun Apr 8 14:19:30 2007
@@ -25,24 +25,24 @@
public class DummySubscription extends AbstractSubscription {
- public DummySubscription(String name) {
- super(name);
- }
+ public DummySubscription(String name) {
+ super(name);
+ }
- @Override
- protected void start() throws SubscribeCreationFailedFault {
- }
+ @Override
+ protected void start() throws SubscribeCreationFailedFault {
+ }
- @Override
- protected void pause() throws PauseFailedFault {
- }
+ @Override
+ protected void pause() throws PauseFailedFault {
+ }
- @Override
- protected void resume() throws ResumeFailedFault {
- }
+ @Override
+ protected void resume() throws ResumeFailedFault {
+ }
- @Override
- protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault {
- }
-
-}
\ No newline at end of file
+ @Override
+ protected void renew(XMLGregorianCalendar terminationTime) throws UnacceptableTerminationTimeFault {
+ }
+
+}
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/SubscriptionTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/SubscriptionTest.java?view=diff&rev=526601&r1=526600&r2=526601
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/SubscriptionTest.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/SubscriptionTest.java Sun Apr 8 14:19:30 2007
@@ -32,81 +32,83 @@
public class SubscriptionTest extends TestCase {
- private JAXBContext context;
- private Unmarshaller unmarshaller;
- private AbstractSubscription subscription;
-
- protected void setUp() throws Exception {
- context = JAXBContext.newInstance(Subscribe.class);
- unmarshaller = context.createUnmarshaller();
- subscription = new DummySubscription("mySubscription");
- }
-
- protected Subscribe getSubscription(String file) throws JAXBException, IOException {
- InputStream is = getClass().getResourceAsStream(file);
- Subscribe subscribe = (Subscribe) unmarshaller.unmarshal(is);
- is.close();
- return subscribe;
- }
-
- public void testWithNilITT() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-nil-itt.xml");
- subscription.validateSubscription(subscribe);
- }
-
- public void testWithAbsoluteITT() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-abs-itt.xml");
- try {
- subscription.validateSubscription(subscribe);
- fail("Invalid initial termination time used. Fault was expected.");
- } catch (UnacceptableInitialTerminationTimeFault e) {
- // OK
- }
- }
-
- public void testWithEmptyITT() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-empty-itt.xml");
- try {
- subscription.validateSubscription(subscribe);
- fail("Invalid initial termination time used. Fault was expected.");
- } catch (UnacceptableInitialTerminationTimeFault e) {
- // OK
- }
- }
-
- public void testWithNoITT() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-no-itt.xml");
- subscription.validateSubscription(subscribe);
- }
-
- public void testWithUseRaw() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-raw.xml");
- subscription.validateSubscription(subscribe);
- }
-
- public void testWithProducerProperties() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-pp.xml");
- try {
- subscription.validateSubscription(subscribe);
- fail("ProducerProperties used. Fault was expected.");
- } catch (InvalidProducerPropertiesExpressionFault e) {
- // OK
- }
- }
-
- public void testWithNoTopic() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-no-topic.xml");
- try {
- subscription.validateSubscription(subscribe);
- fail("ProducerProperties used. Fault was expected.");
- } catch (InvalidFilterFault e) {
- // OK
- }
- }
-
- public void testWithEPR() throws Exception {
- Subscribe subscribe = getSubscription("subscribe-epr.xml");
- subscription.validateSubscription(subscribe);
- }
-
+ private JAXBContext context;
+
+ private Unmarshaller unmarshaller;
+
+ private AbstractSubscription subscription;
+
+ protected void setUp() throws Exception {
+ context = JAXBContext.newInstance(Subscribe.class);
+ unmarshaller = context.createUnmarshaller();
+ subscription = new DummySubscription("mySubscription");
+ }
+
+ protected Subscribe getSubscription(String file) throws JAXBException, IOException {
+ InputStream is = getClass().getResourceAsStream(file);
+ Subscribe subscribe = (Subscribe) unmarshaller.unmarshal(is);
+ is.close();
+ return subscribe;
+ }
+
+ public void testWithNilITT() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-nil-itt.xml");
+ subscription.validateSubscription(subscribe);
+ }
+
+ public void testWithAbsoluteITT() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-abs-itt.xml");
+ try {
+ subscription.validateSubscription(subscribe);
+ fail("Invalid initial termination time used. Fault was expected.");
+ } catch (UnacceptableInitialTerminationTimeFault e) {
+ // OK
+ }
+ }
+
+ public void testWithEmptyITT() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-empty-itt.xml");
+ try {
+ subscription.validateSubscription(subscribe);
+ fail("Invalid initial termination time used. Fault was expected.");
+ } catch (UnacceptableInitialTerminationTimeFault e) {
+ // OK
+ }
+ }
+
+ public void testWithNoITT() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-no-itt.xml");
+ subscription.validateSubscription(subscribe);
+ }
+
+ public void testWithUseRaw() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-raw.xml");
+ subscription.validateSubscription(subscribe);
+ }
+
+ public void testWithProducerProperties() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-pp.xml");
+ try {
+ subscription.validateSubscription(subscribe);
+ fail("ProducerProperties used. Fault was expected.");
+ } catch (InvalidProducerPropertiesExpressionFault e) {
+ // OK
+ }
+ }
+
+ public void testWithNoTopic() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-no-topic.xml");
+ try {
+ subscription.validateSubscription(subscribe);
+ fail("ProducerProperties used. Fault was expected.");
+ } catch (InvalidFilterFault e) {
+ // OK
+ }
+ }
+
+ public void testWithEPR() throws Exception {
+ Subscribe subscribe = getSubscription("subscribe-epr.xml");
+ subscription.validateSubscription(subscribe);
+ }
+
}
Modified: incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java?view=diff&rev=526601&r1=526600&r2=526601
==============================================================================
--- incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java (original)
+++ incubator/servicemix/trunk/deployables/serviceengines/servicemix-wsn2005/src/test/java/org/apache/servicemix/wsn/component/WSNComponentTest.java Sun Apr 8 14:19:30 2007
@@ -28,6 +28,11 @@
import javax.xml.namespace.QName;
import javax.xml.parsers.DocumentBuilder;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.xml.sax.InputSource;
+
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
@@ -47,14 +52,11 @@
import org.oasis_open.docs.wsn.b_2.Notify;
import org.w3._2005._08.addressing.AttributedURIType;
import org.w3._2005._08.addressing.EndpointReferenceType;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.xml.sax.InputSource;
public class WSNComponentTest extends TestCase {
- public static QName NOTIFICATION_BROKER = new QName("http://servicemix.org/wsnotification", "NotificationBroker");
+ public static final QName NOTIFICATION_BROKER =
+ new QName("http://servicemix.org/wsnotification", "NotificationBroker");
private JBIContainer jbi;
private BrokerService jmsBroker;