You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by ed...@apache.org on 2010/07/11 12:10:43 UTC

svn commit: r963037 - in /tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation: JavaAsyncImplementationInvoker.java ResponseDispatchImpl.java

Author: edwardsmj
Date: Sun Jul 11 10:10:42 2010
New Revision: 963037

URL: http://svn.apache.org/viewvc?rev=963037&view=rev
Log:
Changes and additions to Java invoker in support of Client-side and Server-side asynchronous services and @asyncInvocation as described in TUSCANY-3608, 3611 & 3612

Modified:
    tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
    tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java

Modified: tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java?rev=963037&r1=963036&r2=963037&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java Sun Jul 11 10:10:42 2010
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.tuscany.sca.core.factory.InstanceWrapper;
 import org.apache.tuscany.sca.core.factory.ObjectCreationException;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseException;
 import org.apache.tuscany.sca.interfacedef.DataType;
 import org.apache.tuscany.sca.interfacedef.Operation;
 import org.apache.tuscany.sca.interfacedef.java.JavaOperation;
@@ -70,7 +71,7 @@ public class JavaAsyncImplementationInvo
             // For an async server method, there is an extra input parameter, which is a DispatchResponse instance 
             // which is typed by the type of the response
             Class<?> responseType = op.getOutputType().getPhysical();
-            ResponseDispatch<?> dispatch = ResponseDispatchImpl.newInstance(responseType);
+            ResponseDispatch<?> dispatch = ResponseDispatchImpl.newInstance(responseType, msg );
             
             Object ret;
             Object[] payload2;
@@ -87,15 +88,12 @@ public class JavaAsyncImplementationInvo
             
             ret = method.invoke(instance, (Object[])payload2);
             
-            try {
-            	ret = ((ResponseDispatchImpl<?>)dispatch).get(50, TimeUnit.SECONDS);
-            } catch (Throwable t) {
-            	throw new InvocationTargetException(t);
-            } // end try
-            
-            scopeContainer.returnWrapper(wrapper, contextId);
+            //ret = ((ResponseDispatchImpl<?>)dispatch).get(50, TimeUnit.SECONDS);
+            throw new InvocationTargetException( new AsyncResponseException("AsyncResponse") );
+
+            //scopeContainer.returnWrapper(wrapper, contextId);
             
-            msg.setBody(ret);
+            //msg.setBody(ret);
         } catch (InvocationTargetException e) {
             Throwable cause = e.getTargetException();
             boolean isChecked = false;

Modified: tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java?rev=963037&r1=963036&r2=963037&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java Sun Jul 11 10:10:42 2010
@@ -20,6 +20,8 @@
 package org.apache.tuscany.sca.implementation.java.invocation;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,7 +29,22 @@ import java.util.concurrent.locks.Condit
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.tuscany.sca.assembly.EndpointReference;
+import org.apache.tuscany.sca.context.CompositeContext;
+import org.apache.tuscany.sca.context.ThreadMessageContext;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.apache.tuscany.sca.core.factory.ObjectFactory;
+import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
+import org.apache.tuscany.sca.core.invocation.CallbackReferenceObjectFactory;
+import org.apache.tuscany.sca.core.invocation.ExtensibleProxyFactory;
+import org.apache.tuscany.sca.core.invocation.ProxyFactory;
+import org.apache.tuscany.sca.invocation.Message;
+import org.apache.tuscany.sca.invocation.MessageFactory;
+import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
 import org.oasisopen.sca.ResponseDispatch;
+import org.oasisopen.sca.ServiceReference;
 
 /**
  * Implementation of the ResponseDispatch interface of the OASIS SCA Java API
@@ -45,6 +62,7 @@ public class ResponseDispatchImpl<T> imp
 	 * Generated serialVersionUID value
 	 */
 	private static final long serialVersionUID = 300158355992568592L;
+    private static String WS_MESSAGE_ID = "WS_MESSAGE_ID";
 	
 	// A latch used to ensure that the sendResponse() and sendFault() operations are used at most once
 	// The latch is initialized with the value "false"
@@ -57,12 +75,24 @@ public class ResponseDispatchImpl<T> imp
 	private volatile T response = null;
 	private volatile Throwable fault = null; 
 	
-	public ResponseDispatchImpl( ) {
+	private ExtensionPointRegistry registry;
+	
+	// Service Reference used for the callback
+	private ServiceReference<AsyncResponseHandler<?>> callbackRef;
+	private String callbackAddress;
+	private String messageID;
+	
+	public ResponseDispatchImpl( Message msg ) {
 		super();
+		callbackRef = getAsyncCallbackRef( msg );
+    	
+		callbackAddress = msg.getFrom().getCallbackEndpoint().getURI();
+    	messageID = (String) msg.getHeaders().get(WS_MESSAGE_ID);
+    	
 	} // end constructor
 	
-	public static <T> ResponseDispatchImpl<T> newInstance( Class<T> type ) {
-		return new ResponseDispatchImpl<T>();
+	public static <T> ResponseDispatchImpl<T> newInstance( Class<T> type, Message msg ) {
+		return new ResponseDispatchImpl<T>( msg );
 	}
 	
 	/**
@@ -89,6 +119,10 @@ public class ResponseDispatchImpl<T> imp
 		} else {
 			throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
 		} // end if
+		// Now dispatch the response to the callback...
+		AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService();
+		setResponseHeaders();
+		handler.setFault(new AsyncFaultWrapper(e));
 	} // end method sendFault
 
 	/**
@@ -108,6 +142,10 @@ public class ResponseDispatchImpl<T> imp
 		} else {
 			throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
 		} // end if
+		// Now dispatch the response to the callback...
+		AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService();
+		setResponseHeaders();
+		handler.setResponse(res);
 	} // end method sendResponse
 	
 	public T get(long timeout, TimeUnit unit) throws Throwable {
@@ -133,4 +171,47 @@ public class ResponseDispatchImpl<T> imp
 	private boolean sendOK() {
 		return latch.compareAndSet(false, true);
 	}
+	
+	/**
+	 * Creates a service reference for the async callback, based on information contained in the supplied message
+	 * @param msg - the incoming message
+	 * @return - a CallBackServiceReference
+	 */
+	@SuppressWarnings("unchecked")
+	private ServiceReference<AsyncResponseHandler<?>> getAsyncCallbackRef( Message msg ) { 
+    	RuntimeEndpointReference callbackEPR = (RuntimeEndpointReference) msg.getHeaders().get("ASYNC_CALLBACK");
+    	if( callbackEPR == null ) return null;
+    	
+    	CompositeContext compositeContext = callbackEPR.getCompositeContext();
+        registry = compositeContext.getExtensionPointRegistry();
+    	ProxyFactory proxyFactory = ExtensibleProxyFactory.getInstance(registry);
+    	List<EndpointReference> eprList = new ArrayList<EndpointReference>();
+    	eprList.add(callbackEPR);
+    	ObjectFactory<?> factory = new CallbackReferenceObjectFactory(AsyncResponseHandler.class, proxyFactory, eprList);
+    	
+    	return (ServiceReference<AsyncResponseHandler<?>>) factory.getInstance();
+    	
+    } // end method getAsyncCallbackEPR
+	
+	/**
+	 * Sets the values of various headers in the response message
+	 */
+	private void setResponseHeaders() {
+		// Is there an existing message context?
+		Message msgContext = ThreadMessageContext.getMessageContext();
+		if( msgContext == null ) {
+			// Create a message context
+			msgContext = getMessageFactory().createMessage();
+		} // end if
+		
+		// Add in the header for the RelatesTo Message ID
+		msgContext.getHeaders().put(WS_MESSAGE_ID, messageID);
+		
+		ThreadMessageContext.setMessageContext(msgContext);
+	} // end method setResponseHeaders
+	
+	private MessageFactory getMessageFactory() {
+        FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class);
+        return modelFactories.getFactory(MessageFactory.class);
+	} // end method getMessageFactory
 }