You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by ed...@apache.org on 2011/01/11 15:16:44 UTC

svn commit: r1057651 - in /tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms: headers/ provider/ transport/ wire/

Author: edwardsmj
Date: Tue Jan 11 14:16:43 2011
New Revision: 1057651

URL: http://svn.apache.org/viewvc?rev=1057651&view=rev
Log:
Complete enablement of the JMS Binding to deal with Async invocations - as under TUSCANY-3809

Modified:
    tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderServiceInterceptor.java
    tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java
    tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java
    tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java
    tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/AsyncResponseDestinationInterceptor.java

Modified: tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderServiceInterceptor.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderServiceInterceptor.java?rev=1057651&r1=1057650&r2=1057651&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderServiceInterceptor.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/headers/HeaderServiceInterceptor.java Tue Jan 11 14:16:43 2011
@@ -24,6 +24,9 @@ import javax.jms.JMSException;
 
 import org.apache.tuscany.sca.binding.jms.JMSBinding;
 import org.apache.tuscany.sca.binding.jms.JMSBindingException;
+import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessor;
+import org.apache.tuscany.sca.binding.jms.provider.JMSMessageProcessorUtil;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
 import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl;
 import org.apache.tuscany.sca.interfacedef.Operation;
 import org.apache.tuscany.sca.invocation.Interceptor;
@@ -34,10 +37,12 @@ public class HeaderServiceInterceptor ex
 
     private Invoker next;
     private JMSBinding jmsBinding;
+    private JMSMessageProcessor responseMessageProcessor;
 
-    public HeaderServiceInterceptor(JMSBinding jmsBinding) {
+    public HeaderServiceInterceptor(ExtensionPointRegistry extensions, JMSBinding jmsBinding) {
         super();
         this.jmsBinding = jmsBinding;
+        this.responseMessageProcessor = JMSMessageProcessorUtil.getResponseMessageProcessor(extensions, jmsBinding);
     }
 
     public Message invoke(Message msg) {
@@ -72,6 +77,8 @@ public class HeaderServiceInterceptor ex
             
             Operation operation = tuscanyMsg.getOperation();
             String operationName = operation.getName();
+            
+            responseMessageProcessor.setOperationName(operationName, jmsMsg);
 
             for (String propName : jmsBinding.getPropertyNames()) {
                 Object value = jmsBinding.getProperty(propName);

Modified: tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java?rev=1057651&r1=1057650&r2=1057651&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingAsyncResponseInvoker.java Tue Jan 11 14:16:43 2011
@@ -38,6 +38,7 @@ public class JMSBindingAsyncResponseInvo
     } // end constructor
     
     public void invokeAsyncResponse(Message msg) {
-         // TODO
+         // Deliberately left null since in JMS the TransportServiceInterceptor does all the work
     } // end method invokeAsyncResponse
+    
 } // end class JMSBindingAsyncResponseInvoker
\ No newline at end of file

Modified: tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java?rev=1057651&r1=1057650&r2=1057651&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/provider/JMSBindingServiceBindingProvider.java Tue Jan 11 14:16:43 2011
@@ -205,11 +205,11 @@ public class JMSBindingServiceBindingPro
         bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT,
                                     new CallbackDestinationInterceptor(endpoint));
 
-        bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT, new HeaderServiceInterceptor(jmsBinding));
+        bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT, new HeaderServiceInterceptor(registry, jmsBinding));
 
         // add async response interceptor after header interceptor
         bindingChain.addInterceptor(Phase.SERVICE_BINDING_WIREFORMAT,
-                                    new AsyncResponseDestinationInterceptor(endpoint));
+                                    new AsyncResponseDestinationInterceptor(endpoint, registry) );
 
         // add request wire format
         bindingChain.addInterceptor(requestWireFormatProvider.getPhase(), 

Modified: tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java?rev=1057651&r1=1057650&r2=1057651&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/transport/TransportServiceInterceptor.java Tue Jan 11 14:16:43 2011
@@ -210,10 +210,11 @@ public class TransportServiceInterceptor
         JMSBindingContext context = msg.getBindingContext();
         try {
             Session session = context.getJmsResponseSession();
-            javax.jms.Message requestJMSMsg = context.getJmsMsg();
+            //javax.jms.Message requestJMSMsg = context.getJmsMsg();
             javax.jms.Message responseJMSMsg = msg.getBody();
             
-            Destination replyDest = requestJMSMsg.getJMSReplyTo();
+            //Destination replyDest = requestJMSMsg.getJMSReplyTo();
+            Destination replyDest = context.getReplyToDestination();
             if (replyDest == null) {
                 if (jmsBinding.getResponseDestinationName() != null) {
                     try {
@@ -251,22 +252,24 @@ public class TransportServiceInterceptor
                 }
             }
     
+            /*
             if (correlationScheme == null || 
                 JMSBindingConstants.CORRELATE_MSG_ID.equalsIgnoreCase(correlationScheme)) {
                 responseJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSMessageID());
             } else if (JMSBindingConstants.CORRELATE_CORRELATION_ID.equalsIgnoreCase(correlationScheme)) {
                 responseJMSMsg.setJMSCorrelationID(requestJMSMsg.getJMSCorrelationID());
-            }                
+            }  
+            */              
                        
             MessageProducer producer = session.createProducer(replyDest);
             
             // Set jms header attributes in producer, not message.
-            int deliveryMode = requestJMSMsg.getJMSDeliveryMode();
-            producer.setDeliveryMode(deliveryMode);
-            int deliveryPriority = requestJMSMsg.getJMSPriority();
-            producer.setPriority(deliveryPriority);
-            long timeToLive = requestJMSMsg.getJMSExpiration();
-            producer.setTimeToLive(timeToLive);
+            //int deliveryMode = requestJMSMsg.getJMSDeliveryMode();
+            //producer.setDeliveryMode(deliveryMode);
+            //int deliveryPriority = requestJMSMsg.getJMSPriority();
+            //producer.setPriority(deliveryPriority);
+            //long timeToLive = requestJMSMsg.getJMSExpiration();
+            //producer.setTimeToLive(timeToLive);
     
             producer.send((javax.jms.Message)msg.getBody());
     

Modified: tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/AsyncResponseDestinationInterceptor.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/AsyncResponseDestinationInterceptor.java?rev=1057651&r1=1057650&r2=1057651&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/AsyncResponseDestinationInterceptor.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/binding-jms-runtime/src/main/java/org/apache/tuscany/sca/binding/jms/wire/AsyncResponseDestinationInterceptor.java Tue Jan 11 14:16:43 2011
@@ -33,11 +33,14 @@ import org.apache.tuscany.sca.binding.jm
 import org.apache.tuscany.sca.binding.jms.JMSBindingException;
 import org.apache.tuscany.sca.binding.jms.context.JMSBindingContext;
 import org.apache.tuscany.sca.binding.jms.provider.JMSResourceFactory;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.FactoryExtensionPoint;
 import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker;
 import org.apache.tuscany.sca.core.invocation.InterceptorAsyncImpl;
 import org.apache.tuscany.sca.invocation.Interceptor;
 import org.apache.tuscany.sca.invocation.Invoker;
 import org.apache.tuscany.sca.invocation.Message;
+import org.apache.tuscany.sca.invocation.MessageFactory;
 import org.apache.tuscany.sca.policy.Intent;
 import org.apache.tuscany.sca.runtime.RuntimeComponentService;
 import org.apache.tuscany.sca.runtime.RuntimeEndpoint;
@@ -50,11 +53,13 @@ public class AsyncResponseDestinationInt
     private Invoker next;
     private RuntimeComponentService service;
 	private RuntimeEndpoint endpoint;
+	private ExtensionPointRegistry registry;
           
-    public AsyncResponseDestinationInterceptor(RuntimeEndpoint endpoint) {
+    public AsyncResponseDestinationInterceptor(RuntimeEndpoint endpoint, ExtensionPointRegistry registry) {
         super();
         this.service = (RuntimeComponentService) endpoint.getService();
         this.endpoint = endpoint;
+        this.registry = registry;
     }
 
     public Invoker getNext() {
@@ -104,8 +109,11 @@ public class AsyncResponseDestinationInt
             // than this interceptor
             String msgID = (String)msg.getHeaders().get("MESSAGE_ID");
             
+            String operationName = msg.getOperation().getName();
+            
             // Create a response invoker and add it to the message headers
-            AsyncResponseInvoker<String> respInvoker = new AsyncResponseInvoker<String>(endpoint, null, asyncRespAddr, msgID);
+            AsyncResponseInvoker<String> respInvoker = 
+            	new AsyncResponseInvoker<String>(endpoint, null, asyncRespAddr, msgID, operationName, getMessageFactory());
             msg.getHeaders().put("ASYNC_RESPONSE_INVOKER", respInvoker);
 
         } catch (JMSException e) {
@@ -191,4 +199,10 @@ public class AsyncResponseDestinationInt
     	} // end while
     	return false;
     } // end method isAsync
+    
+	private MessageFactory getMessageFactory() {
+		FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class);
+		return modelFactories.getFactory(MessageFactory.class);
+	} // end method getMessageFactory
+	
 } // end class
\ No newline at end of file