You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/02/23 18:25:43 UTC

svn commit: r380168 - in /incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn: component/WSNLifeCycle.java jbi/JbiNotificationBroker.java jbi/JbiPublisher.java jbi/JbiSubscription.java

Author: gnodet
Date: Thu Feb 23 09:25:33 2006
New Revision: 380168

URL: http://svn.apache.org/viewcvs?rev=380168&view=rev
Log:
Dispatch notifications asynchronously using a send instead of a sendSync

Modified:
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/component/WSNLifeCycle.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jbi/JbiNotificationBroker.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jbi/JbiPublisher.java
    incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jbi/JbiSubscription.java

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/component/WSNLifeCycle.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/component/WSNLifeCycle.java?rev=380168&r1=380167&r2=380168&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/component/WSNLifeCycle.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/component/WSNLifeCycle.java Thu Feb 23 09:25:33 2006
@@ -54,7 +54,7 @@
 		super.doInit();
         // Notification Broker
         notificationBroker = new JbiNotificationBroker(configuration.getBrokerName());
-        notificationBroker.setContext(context);
+        notificationBroker.setLifeCycle(this);
         notificationBroker.setManager(new WSNEndpointManager());
         if (connectionFactory == null) {
             connectionFactory = lookupConnectionFactory();

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jbi/JbiNotificationBroker.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jbi/JbiNotificationBroker.java?rev=380168&r1=380167&r2=380168&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jbi/JbiNotificationBroker.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jbi/JbiNotificationBroker.java Thu Feb 23 09:25:33 2006
@@ -15,15 +15,14 @@
  */
 package org.apache.servicemix.wsn.jbi;
 
-import javax.jbi.component.ComponentContext;
-
+import org.apache.servicemix.wsn.component.WSNLifeCycle;
 import org.apache.servicemix.wsn.jms.JmsNotificationBroker;
 import org.apache.servicemix.wsn.jms.JmsPublisher;
 import org.apache.servicemix.wsn.jms.JmsSubscription;
 
 public class JbiNotificationBroker extends JmsNotificationBroker {
 
-	private ComponentContext context;
+	private WSNLifeCycle lifeCycle;
 	
 	public JbiNotificationBroker(String name) {
 		super(name);
@@ -32,24 +31,24 @@
 	@Override
 	protected JmsSubscription createJmsSubscription(String name) {
 		JbiSubscription subscription = new JbiSubscription(name);
-		subscription.setContext(context);
+		subscription.setLifeCycle(lifeCycle);
 		return subscription;
 	}
 
 	@Override
 	protected JmsPublisher createJmsPublisher(String name) {
 		JbiPublisher publisher = new JbiPublisher(name);
-		publisher.setContext(context);
+		publisher.setLifeCycle(lifeCycle);
 		publisher.setNotificationBrokerAddress(address);
 		return publisher;
 	}
 
-	public ComponentContext getContext() {
-		return context;
-	}
-
-	public void setContext(ComponentContext context) {
-		this.context = context;
-	}
+    public WSNLifeCycle getLifeCycle() {
+        return lifeCycle;
+    }
+
+    public void setLifeCycle(WSNLifeCycle lifeCycle) {
+        this.lifeCycle = lifeCycle;
+    }
 
 }

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jbi/JbiPublisher.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jbi/JbiPublisher.java?rev=380168&r1=380167&r2=380168&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jbi/JbiPublisher.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jbi/JbiPublisher.java Thu Feb 23 09:25:33 2006
@@ -24,6 +24,7 @@
 import org.apache.servicemix.wsn.client.AbstractWSAClient;
 import org.apache.servicemix.wsn.client.NotificationBroker;
 import org.apache.servicemix.wsn.client.Subscription;
+import org.apache.servicemix.wsn.component.WSNLifeCycle;
 import org.apache.servicemix.wsn.jaxws.InvalidTopicExpressionFault;
 import org.apache.servicemix.wsn.jaxws.PublisherRegistrationFailedFault;
 import org.apache.servicemix.wsn.jaxws.PublisherRegistrationRejectedFault;
@@ -35,7 +36,7 @@
 
 public class JbiPublisher extends JmsPublisher {
 
-	private ComponentContext context;
+    private WSNLifeCycle lifeCycle;
 	private ServiceEndpoint endpoint;
 	private String notificationBrokerAddress;
 	
@@ -43,10 +44,6 @@
 		super(name);
 	}
 
-	public void setContext(ComponentContext context) {
-		this.context = context;
-	}
-
 	public String getNotificationBrokerAddress() {
 		return notificationBrokerAddress;
 	}
@@ -59,7 +56,7 @@
 	protected Object startSubscription() {
 		Subscription subscription = null;
 		try {
-			NotificationBroker broker = new NotificationBroker(context);
+			NotificationBroker broker = new NotificationBroker(getContext());
 			broker.setResolver(AbstractWSAClient.resolveWSA(publisherReference));
 			subscription = broker.subscribe(AbstractWSAClient.createWSA(notificationBrokerAddress), 
 														 "noTopic", null);
@@ -87,7 +84,7 @@
 	protected void validatePublisher(RegisterPublisher registerPublisherRequest) throws InvalidTopicExpressionFault, PublisherRegistrationFailedFault, PublisherRegistrationRejectedFault, ResourceUnknownFault, TopicNotSupportedFault {
 		super.validatePublisher(registerPublisherRequest);
 		String[] parts = split(publisherReference.getAddress().getValue());
-		endpoint = context.getEndpoint(new QName(parts[0], parts[1]), parts[2]);
+		endpoint = getContext().getEndpoint(new QName(parts[0], parts[1]), parts[2]);
 		if (endpoint == null) {
 			PublisherRegistrationFailedFaultType fault = new PublisherRegistrationFailedFaultType();
 			throw new PublisherRegistrationFailedFault("Unable to resolve consumer reference endpoint", fault);
@@ -107,6 +104,18 @@
 		String svcName = uri.substring(idx2 + 1, idx1);
 		String nsUri   = uri.substring(0, idx2);
     	return new String[] { nsUri, svcName, epName };
+    }
+
+    public ComponentContext getContext() {
+        return lifeCycle.getContext();
+    }
+
+    public WSNLifeCycle getLifeCycle() {
+        return lifeCycle;
+    }
+
+    public void setLifeCycle(WSNLifeCycle lifeCycle) {
+        this.lifeCycle = lifeCycle;
     }
 
 }

Modified: incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jbi/JbiSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jbi/JbiSubscription.java?rev=380168&r1=380167&r2=380168&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jbi/JbiSubscription.java (original)
+++ incubator/servicemix/trunk/servicemix-wsn2005/src/main/java/org/apache/servicemix/wsn/jbi/JbiSubscription.java Thu Feb 23 09:25:33 2006
@@ -19,6 +19,7 @@
 import javax.jbi.component.ComponentContext;
 import javax.jbi.messaging.DeliveryChannel;
 import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessageExchangeFactory;
 import javax.jbi.messaging.NormalizedMessage;
 import javax.jbi.servicedesc.ServiceEndpoint;
@@ -27,6 +28,8 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.common.ExchangeProcessor;
+import org.apache.servicemix.wsn.component.WSNLifeCycle;
 import org.apache.servicemix.wsn.jaxws.InvalidFilterFault;
 import org.apache.servicemix.wsn.jaxws.InvalidMessageContentExpressionFault;
 import org.apache.servicemix.wsn.jaxws.InvalidProducerPropertiesExpressionFault;
@@ -44,11 +47,13 @@
 
 	private static Log log = LogFactory.getLog(JbiSubscription.class);
 	
-	private ComponentContext context;
+	private WSNLifeCycle lifeCycle;
 	private ServiceEndpoint endpoint;
+    private ExchangeProcessor processor;
 	
 	public JbiSubscription(String name) {
 		super(name);
+        processor = new NoOpProcessor();
 	}
 
     @Override
@@ -60,7 +65,7 @@
 	protected void validateSubscription(Subscribe subscribeRequest) throws InvalidFilterFault, InvalidMessageContentExpressionFault, InvalidProducerPropertiesExpressionFault, InvalidTopicExpressionFault, SubscribeCreationFailedFault, TopicExpressionDialectUnknownFault, TopicNotSupportedFault, UnacceptableInitialTerminationTimeFault {
 		super.validateSubscription(subscribeRequest);
         String[] parts = split(consumerReference.getAddress().getValue().trim());
-        endpoint = context.getEndpoint(new QName(parts[0], parts[1]), parts[2]);
+        endpoint = getContext().getEndpoint(new QName(parts[0], parts[1]), parts[2]);
         if (endpoint == null) {
             SubscribeCreationFailedFaultType fault = new SubscribeCreationFailedFaultType();
             throw new SubscribeCreationFailedFault("Unable to resolve consumer reference endpoint", fault);
@@ -83,29 +88,42 @@
     }
 	
 	@Override
-	protected void doNotify(Element content) {
-		try {
-			DeliveryChannel channel = context.getDeliveryChannel();
-			MessageExchangeFactory factory = channel.createExchangeFactory(endpoint);
-			InOnly inonly = factory.createInOnlyExchange();
-			NormalizedMessage msg = inonly.createMessage();
-			inonly.setInMessage(msg);
-			msg.setContent(new DOMSource(content));
-			if (!channel.sendSync(inonly)) {
-				log.warn("Notification was aborted");
-			}
-		} catch (JBIException e) {
-			log.warn("Could not deliver notification", e);
-		}
+	protected void doNotify(final Element content) {
+        try {
+            DeliveryChannel channel = getContext().getDeliveryChannel();
+            MessageExchangeFactory factory = channel.createExchangeFactory(endpoint);
+            InOnly inonly = factory.createInOnlyExchange();
+            NormalizedMessage msg = inonly.createMessage();
+            inonly.setInMessage(msg);
+            msg.setContent(new DOMSource(content));
+            getLifeCycle().sendConsumerExchange(inonly, processor);
+        } catch (JBIException e) {
+            log.warn("Could not deliver notification", e);
+        }
 	}
 
 	public ComponentContext getContext() {
-		return context;
+		return lifeCycle.getContext();
 	}
 
-	public void setContext(ComponentContext context) {
-		this.context = context;
-	}
+    public WSNLifeCycle getLifeCycle() {
+        return lifeCycle;
+    }
+
+    public void setLifeCycle(WSNLifeCycle lifeCycle) {
+        this.lifeCycle = lifeCycle;
+    }
+
+    protected class NoOpProcessor implements ExchangeProcessor {
 
+        public void process(MessageExchange exchange) throws Exception {
+        }
+
+        public void start() throws Exception {
+        }
+
+        public void stop() throws Exception {
+        }
+    }
 
 }