You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by ed...@apache.org on 2011/01/20 17:52:03 UTC

svn commit: r1061385 - in /tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core: assembly/impl/ context/impl/ invocation/ invocation/impl/

Author: edwardsmj
Date: Thu Jan 20 16:52:03 2011
New Revision: 1061385

URL: http://svn.apache.org/viewvc?rev=1061385&view=rev
Log:
Stage 1 of Axis2 binding-ws support of async callbacks, as described in TUSCANY-3821

Modified:
    tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
    tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/context/impl/CallbackServiceReferenceImpl.java
    tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java
    tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java
    tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java
    tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java

Modified: tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java?rev=1061385&r1=1061384&r2=1061385&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/assembly/impl/RuntimeEndpointImpl.java Thu Jan 20 16:52:03 2011
@@ -62,6 +62,7 @@ import org.apache.tuscany.sca.core.Facto
 import org.apache.tuscany.sca.core.UtilityExtensionPoint;
 import org.apache.tuscany.sca.core.assembly.RuntimeAssemblyFactory;
 import org.apache.tuscany.sca.core.invocation.AsyncResponseService;
+import org.apache.tuscany.sca.core.invocation.Constants;
 import org.apache.tuscany.sca.core.invocation.ExtensibleWireProcessor;
 import org.apache.tuscany.sca.core.invocation.NonBlockingInterceptor;
 import org.apache.tuscany.sca.core.invocation.RuntimeInvoker;
@@ -279,10 +280,11 @@ public class RuntimeEndpointImpl extends
     	// Deal with async callback
     	// Ensure invocation chains are built...
     	getInvocationChains();
-    	if ( !this.getCallbackEndpointReferences().isEmpty() ) {
+    	// async callback handling
+    	if( this.isAsyncInvocation() && !this.getCallbackEndpointReferences().isEmpty() ) {
     		RuntimeEndpointReference asyncEPR = (RuntimeEndpointReference) this.getCallbackEndpointReferences().get(0);
     		// Place a link to the callback EPR into the message headers...
-    		msg.getHeaders().put("ASYNC_CALLBACK", asyncEPR );
+    		msg.getHeaders().put(Constants.ASYNC_CALLBACK, asyncEPR );
     	} 
     	// end of async callback handling
         return invoker.invokeBinding(msg);
@@ -372,7 +374,7 @@ public class RuntimeEndpointImpl extends
             
             // Handle cases where the operation is an async server 
             if( targetOperation.isAsyncServer() ) {
-            	createAsyncServerCallback( this, operation );
+            	createAsyncServerCallback();
             } // end if
         }
 
@@ -412,24 +414,32 @@ public class RuntimeEndpointImpl extends
     } // end method initInvocationChains
     
     /**
-     * Creates the async callback for the supplied Endpoint and Operation, if it does not already exist
+     * Creates the async callback for this Endpoint, if it does not already exist
      * and stores it into the Endpoint
-     * @param endpoint - the Endpoint
-     * @param operation - the Operation
      */
-    private void createAsyncServerCallback( RuntimeEndpoint endpoint, Operation operation ) {
+    public void createAsyncServerCallback( ) {
     	// No need to create a callback if the Binding supports async natively...
-    	if( hasNativeAsyncBinding(endpoint) ) return;
+    	if( hasNativeAsyncBinding(this) ) return;
     	
     	// Check to see if the callback already exists
-    	if( asyncCallbackExists( endpoint ) ) return;
+    	if( asyncCallbackExists( this ) ) return;
     	
-    	RuntimeEndpointReference asyncEPR = createAsyncEPR( endpoint );
+    	RuntimeEndpointReference asyncEPR = createAsyncEPR( this );
     	
     	// Store the new callback EPR into the Endpoint
-    	endpoint.getCallbackEndpointReferences().add(asyncEPR);
+    	this.getCallbackEndpointReferences().add(asyncEPR);
+    	
+    	// Also store the callback EPR into the EndpointRegistry
+    	EndpointRegistry epReg = getEndpointRegistry( registry );
+    	if( epReg != null ) epReg.addEndpointReference(asyncEPR);
     } // end method createAsyncServerCallback
     
+    public RuntimeEndpointReference getAsyncServerCallback() {
+    	
+    	return (RuntimeEndpointReference) this.getCallbackEndpointReferences().get(0);
+    } // end method getAsyncServerCallback
+    
+    
     /**
      * Indicates if a given endpoint has a Binding that supports native async invocation
      * @param endpoint - the endpoint
@@ -455,6 +465,9 @@ public class RuntimeEndpointImpl extends
         RuntimeEndpointReference epr = (RuntimeEndpointReference)assemblyFactory.createEndpointReference();
         epr.bind( compositeContext );
         
+        // Create pseudo-component
+        epr.setComponent(component);
+        
         // Create pseudo-reference
         ComponentReference reference = assemblyFactory.createComponentReference();
     	  ExtensionPointRegistry registry = compositeContext.getExtensionPointRegistry();
@@ -487,8 +500,6 @@ public class RuntimeEndpointImpl extends
 		// Need to establish policies here (binding has some...)
 		epr.getRequiredIntents().addAll( endpoint.getRequiredIntents() );
 		epr.getPolicySets().addAll( endpoint.getPolicySets() );
-		String eprURI = endpoint.getComponent().getName() + "#reference-binding(" + referenceName + "/" + referenceName + ")";
-		epr.setURI(eprURI);
 		
 		// Attach a dummy endpoint to the epr
 		RuntimeEndpoint ep = (RuntimeEndpoint)assemblyFactory.createEndpoint();
@@ -497,6 +508,10 @@ public class RuntimeEndpointImpl extends
 		//epr.setStatus(EndpointReference.Status.RESOLVED_BINDING);
 		epr.setStatus(EndpointReference.Status.WIRED_TARGET_FOUND_AND_MATCHED);
 		epr.setUnresolved(false);
+		
+		// Set the URI for the EPR
+		String eprURI = endpoint.getComponent().getName() + "#reference-binding(" + referenceName + "/" + referenceName + ")";
+		epr.setURI(eprURI);
         
     	return epr;
     } // end method RuntimeEndpointReference
@@ -536,10 +551,10 @@ public class RuntimeEndpointImpl extends
 			XMLStreamReader reader = inputFactory.createXMLStreamReader(source);
 			reader.next();
 			Binding newBinding = (Binding) processor.read(reader, context );
-			newBinding.setName("asyncCallback");
+			newBinding.setName(reference.getName());
 			
 			// Create a URI address for the callback based on the Component_Name/Reference_Name pattern
-			String callbackURI = "/" + component.getName() + "/" + reference.getName();
+			//String callbackURI = "/" + component.getName() + "/" + reference.getName();
 			//newBinding.setURI(callbackURI);
 			
 			BuilderExtensionPoint builders = registry.getExtensionPoint(BuilderExtensionPoint.class);
@@ -934,13 +949,7 @@ public class RuntimeEndpointImpl extends
      */
     private RuntimeEndpointImpl findActualEP(RuntimeEndpointImpl ep,
 			ExtensionPointRegistry registry) {
-		// Get the EndpointRegistry
-        DomainRegistryFactory domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(registry);
-        
-        if( domainRegistryFactory == null ) return null;
-        
-        // TODO: For the moment, just use the first (and only!) EndpointRegistry...
-        EndpointRegistry endpointRegistry = (EndpointRegistry) domainRegistryFactory.getEndpointRegistries().toArray()[0];
+		EndpointRegistry endpointRegistry = getEndpointRegistry( registry );
         
         if( endpointRegistry == null ) return null;
         
@@ -951,6 +960,22 @@ public class RuntimeEndpointImpl extends
         
 		return null;
 	} // end method findActualEP
+    
+    /**
+     * Get the EndpointRegistry
+     * @param registry - the ExtensionPoint registry
+     * @return the EndpointRegistry - will be null if the EndpointRegistry cannot be found
+     */
+    private EndpointRegistry getEndpointRegistry( ExtensionPointRegistry registry) {
+        DomainRegistryFactory domainRegistryFactory = ExtensibleDomainRegistryFactory.getInstance(registry);
+        
+        if( domainRegistryFactory == null ) return null;
+        
+        // TODO: For the moment, just use the first (and only!) EndpointRegistry...
+        EndpointRegistry endpointRegistry = (EndpointRegistry) domainRegistryFactory.getEndpointRegistries().toArray()[0];
+    	
+    	return endpointRegistry;
+    } // end method 
 
 	public InterfaceContract getBindingInterfaceContract() {
         resolve();

Modified: tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/context/impl/CallbackServiceReferenceImpl.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/context/impl/CallbackServiceReferenceImpl.java?rev=1061385&r1=1061384&r2=1061385&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/context/impl/CallbackServiceReferenceImpl.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/context/impl/CallbackServiceReferenceImpl.java Thu Jan 20 16:52:03 2011
@@ -24,6 +24,7 @@ import org.apache.tuscany.sca.assembly.E
 import org.apache.tuscany.sca.assembly.EndpointReference;
 import org.apache.tuscany.sca.context.CompositeContext;
 import org.apache.tuscany.sca.context.ThreadMessageContext;
+import org.apache.tuscany.sca.core.invocation.Constants;
 import org.apache.tuscany.sca.invocation.Message;
 import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
 import org.oasisopen.sca.ServiceRuntimeException;
@@ -32,8 +33,18 @@ public class CallbackServiceReferenceImp
     private RuntimeEndpointReference callbackEPR;
     private List<? extends EndpointReference> callbackEPRs;
     private Endpoint resolvedEndpoint;
+    // Holds the ID of the Message that caused the creation of this CallbackServiceReference
+    private String msgID;
 
-    /*
+    /** 
+     * Gets the message ID associated with this callback reference
+     * @return the message ID
+     */
+    public String getMsgID() {
+		return msgID;
+	}
+
+	/*
      * Public constructor for Externalizable serialization/deserialization
      */
     public CallbackServiceReferenceImpl() {
@@ -62,6 +73,9 @@ public class CallbackServiceReferenceImp
             throw new ServiceRuntimeException("No callback binding found for " + msgContext.getTo().toString());
         }
         resolvedEndpoint = msgContext.getFrom().getCallbackEndpoint();
+        
+        // Capture the Message ID from the message which caused the creation of this CallBackServiceReference
+        this.msgID = (String) msgContext.getHeaders().get(Constants.MESSAGE_ID);
     }
 
     @Override

Modified: tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java?rev=1061385&r1=1061384&r2=1061385&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/AsyncResponseInvoker.java Thu Jan 20 16:52:03 2011
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.tuscany.sca.assembly.Endpoint;
 import org.apache.tuscany.sca.context.CompositeContext;
 import org.apache.tuscany.sca.core.ExtensionPointRegistry;
 import org.apache.tuscany.sca.core.FactoryExtensionPoint;
@@ -58,6 +59,7 @@ public class AsyncResponseInvoker<T> imp
     private String operationName;
     private MessageFactory messageFactory;
     private String bindingType = "";
+    private boolean isNativeAsync;
     
     public AsyncResponseInvoker(RuntimeEndpoint requestEndpoint,
 			RuntimeEndpointReference responseEndpointReference,
@@ -70,6 +72,13 @@ public class AsyncResponseInvoker<T> imp
 		this.relatesToMsgID = relatesToMsgID;
 		this.operationName = operationName;
 		this.messageFactory = messageFactory;
+		
+        if ((requestEndpoint.getBindingProvider() instanceof EndpointAsyncProvider) &&
+                (((EndpointAsyncProvider)requestEndpoint.getBindingProvider()).supportsNativeAsync())){
+        	isNativeAsync = true;
+        } else {
+        	isNativeAsync = false;
+        } // end if
 	} // end constructor
 
     /** 
@@ -79,8 +88,7 @@ public class AsyncResponseInvoker<T> imp
     	responseMessage.getHeaders().put(Constants.ASYNC_RESPONSE_INVOKER, this);
     	responseMessage.getHeaders().put(Constants.RELATES_TO, relatesToMsgID);
     	
-        if ((requestEndpoint.getBindingProvider() instanceof EndpointAsyncProvider) &&
-             (((EndpointAsyncProvider)requestEndpoint.getBindingProvider()).supportsNativeAsync())){
+        if (isNativeAsync){
             // process the response as a native async response
             requestEndpoint.invokeAsyncResponse(responseMessage);
         } else {
@@ -106,8 +114,8 @@ public class AsyncResponseInvoker<T> imp
 	}
 
 	/**
-     * If you have Java beans you can call this and we'll create
-     * a Tuscany message
+     * Invokes the async response where the parameter is Java bean(s) 
+     * - this method creates a Tuscany message
      * 
      * @param args the response data
      */
@@ -115,10 +123,22 @@ public class AsyncResponseInvoker<T> imp
         
         Message msg = messageFactory.createMessage();
 
-        msg.setOperation(getOperation());
+        msg.setOperation(getOperation( args ));
+        
+        // If this is not native async, then any Throwable is being passed as a parameter and
+        // requires wrapping
+        if( !isNativeAsync && args instanceof Throwable ) {
+        	args = new AsyncFaultWrapper( (Throwable) args ); 
+        } // end if
         
-        // on the the following will be null depending
-        // on whether this is native or non-native async
+        // If this is not native async, then the message must contain an array of args since
+        // this is what is expected when invoking an EPR for the async response...
+        if( !isNativeAsync ) {
+        	Object[] objs = new Object[1];
+        	objs[0] = args;
+        	args = objs;
+        } // end if
+
         msg.setTo(requestEndpoint);
         msg.setFrom(responseEndpointReference);
         
@@ -132,12 +152,22 @@ public class AsyncResponseInvoker<T> imp
         
     } // end method invokeAsyncResponse(Object)
 
-	private Operation getOperation() {
-		List<Operation> ops = requestEndpoint.getService().getInterfaceContract().getInterface().getOperations();
-		for (Operation op : ops) {
-			if( operationName.equals(op.getName()) ) return op;
-		} // end for
-		return null;
+	private Operation getOperation( Object args ) {
+		if( isNativeAsync ) {
+			List<Operation> ops = requestEndpoint.getService().getInterfaceContract().getInterface().getOperations();
+			for (Operation op : ops) {
+				if( operationName.equals(op.getName()) ) return op;
+			} // end for
+			return null;
+		} else {
+			operationName = "setResponse";
+			if( args instanceof Throwable ) { operationName = "setWrappedFault"; }
+			List<Operation> ops = responseEndpointReference.getReference().getInterfaceContract().getInterface().getOperations();
+			for (Operation op : ops) {
+				if( operationName.equals(op.getName()) ) return op;
+			} // end for
+			return null;
+		} // end if 
 	} // end getOperation
 
 	public void setBindingType(String bindingType) {
@@ -155,4 +185,9 @@ public class AsyncResponseInvoker<T> imp
     public RuntimeEndpointReference getResponseEndpointReference() {
 	return this.responseEndpointReference;
     }
+
+	public void setResponseEndpointReference(
+			RuntimeEndpointReference responseEndpointReference) {
+		this.responseEndpointReference = responseEndpointReference;
+	}
 } // end class

Modified: tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java?rev=1061385&r1=1061384&r2=1061385&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/Constants.java Thu Jan 20 16:52:03 2011
@@ -22,10 +22,10 @@ package org.apache.tuscany.sca.core.invo
 /**
  * Constants used during invocation in the runtime
  *
-
  */
 public interface Constants {
-    String MESSAGE_ID = "MESSAGE_ID";
-    String RELATES_TO = "RELATES_TO";
-    String ASYNC_RESPONSE_INVOKER = "ASYNC_RESPONSE_INVOKER";
+    String MESSAGE_ID 				= "MESSAGE_ID";
+    String RELATES_TO 				= "RELATES_TO";
+    String ASYNC_RESPONSE_INVOKER 	= "ASYNC_RESPONSE_INVOKER";
+    String ASYNC_CALLBACK 			= "ASYNC_CALLBACK";
 }

Modified: tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java?rev=1061385&r1=1061384&r2=1061385&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/AsyncResponseHandlerImpl.java Thu Jan 20 16:52:03 2011
@@ -30,6 +30,7 @@ import org.apache.tuscany.sca.assembly.R
 import org.apache.tuscany.sca.assembly.Service;
 import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
 import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
+import org.apache.tuscany.sca.core.invocation.Constants;
 import org.apache.tuscany.sca.interfacedef.Operation;
 import org.apache.tuscany.sca.invocation.Invoker;
 import org.apache.tuscany.sca.invocation.Message;
@@ -155,13 +156,10 @@ public class AsyncResponseHandlerImpl<V>
 	 * @param msg - the Tuscany message containing the response from the async service invocation
 	 * which is either the Response message or an exception of some kind
 	 */
-    private static final String WS_MESSAGE_ID = "WS_MESSAGE_ID";
-    public Message invoke(Message msg) {
-		// Get the unique ID from the message header
-		String idValue = (String)msg.getHeaders().get(WS_MESSAGE_ID);
-		if (idValue == null){
-		    idValue = (String)msg.getHeaders().get("MESSAGE_ID");
-		}
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+	public Message invoke(Message msg) {
+		// Get the unique ID from the RELATES_TO message header
+		String idValue = (String)msg.getHeaders().get(Constants.RELATES_TO);
 		
 		if( idValue == null ) { 
 			System.out.println( "Async message ID not found ");

Modified: tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java?rev=1061385&r1=1061384&r2=1061385&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/core/src/main/java/org/apache/tuscany/sca/core/invocation/impl/JDKCallbackInvocationHandler.java Thu Jan 20 16:52:03 2011
@@ -21,9 +21,15 @@ package org.apache.tuscany.sca.core.invo
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
+import org.apache.tuscany.sca.context.ThreadMessageContext;
 import org.apache.tuscany.sca.core.context.impl.CallbackServiceReferenceImpl;
+import org.apache.tuscany.sca.core.invocation.Constants;
+import org.apache.tuscany.sca.interfacedef.Operation;
 import org.apache.tuscany.sca.invocation.InvocationChain;
+import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.Message;
 import org.apache.tuscany.sca.invocation.MessageFactory;
+import org.apache.tuscany.sca.runtime.Invocable;
 import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
 import org.oasisopen.sca.ServiceReference;
 import org.oasisopen.sca.ServiceRuntimeException;
@@ -43,7 +49,7 @@ public class JDKCallbackInvocationHandle
     }
 
     @Override
-    @SuppressWarnings( {"unchecked"})
+    @SuppressWarnings( {"unchecked", "rawtypes"})
     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
         
         if (Object.class == method.getDeclaringClass()) {
@@ -65,7 +71,8 @@ public class JDKCallbackInvocationHandle
         }
 
         try {
-            return invoke(chain, args, wire);
+        	String msgID = ((CallbackServiceReferenceImpl)callableReference).getMsgID();
+            return invoke(chain, args, wire, msgID );
         } catch (InvocationTargetException e) {
             Throwable t = e.getCause();
             throw t;
@@ -73,5 +80,58 @@ public class JDKCallbackInvocationHandle
             // allow the cloned wire to be reused by subsequent callbacks
         }
     }
+    
+    /**
+     * Invoke the chain
+     * @param chain - the chain
+     * @param args - arguments to the invocation as an array of Objects
+     * @param source - the Endpoint or EndpointReference to which the chain relates
+     * @param msgID - ID of the message to which this invovation is a callback - ID ends up in "RELATES_TO" header
+     * @return - the Response message from the invocation
+     * @throws Throwable - if any exception occurs during the invocation
+     */
+    @Override
+    protected Object invoke(InvocationChain chain, Object[] args, Invocable source, String msgID)
+                         throws Throwable {
+        Message msg = messageFactory.createMessage();
+        if (source instanceof RuntimeEndpointReference) {
+            msg.setFrom((RuntimeEndpointReference)source);
+        }
+        if (target != null) {
+            msg.setTo(target);
+        } else {
+            if (source instanceof RuntimeEndpointReference) {
+                msg.setTo(((RuntimeEndpointReference)source).getTargetEndpoint());
+            }
+        }
+        Invoker headInvoker = chain.getHeadInvoker();
+        Operation operation = chain.getTargetOperation();
+        msg.setOperation(operation);
+        msg.setBody(args);
+
+        Message msgContext = ThreadMessageContext.getMessageContext();
+        
+        // Deal with header information that needs to be copied from the message context to the new message...
+        transferMessageHeaders( msg, msgContext);
+        
+        ThreadMessageContext.setMessageContext(msg);
+        
+        // If there is a supplied message ID, place its value into the Message Header under "RELATES_TO"
+        if( msgID != null ){
+        	msg.getHeaders().put(Constants.RELATES_TO, msgID);
+        } // end if
+
+        try {
+            // dispatch the source down the chain and get the response
+            Message resp = headInvoker.invoke(msg);
+            Object body = resp.getBody();
+            if (resp.isFault()) {
+                throw (Throwable)body;
+            }
+            return body;
+        } finally {
+            ThreadMessageContext.setMessageContext(msgContext);
+        }
+    } // end method invoke
 
 }