You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by ed...@apache.org on 2011/01/11 15:21:27 UTC

svn commit: r1057654 - /tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/

Author: edwardsmj
Date: Tue Jan 11 14:21:27 2011
New Revision: 1057654

URL: http://svn.apache.org/viewvc?rev=1057654&view=rev
Log:
Separate the Java implementation specific async invocation code from the core code - as in TUSCANY-3786

Added:
    tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncResponseInvokerImpl.java   (with props)
Modified:
    tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
    tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaImplementationProvider.java
    tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java

Modified: tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java?rev=1057654&r1=1057653&r2=1057654&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java Tue Jan 11 14:21:27 2011
@@ -29,6 +29,9 @@ import org.apache.tuscany.sca.interfaced
 import org.apache.tuscany.sca.interfacedef.InterfaceContract;
 import org.apache.tuscany.sca.interfacedef.Operation;
 import org.apache.tuscany.sca.interfacedef.java.JavaOperation;
+import org.apache.tuscany.sca.invocation.InterceptorAsync;
+import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
 import org.apache.tuscany.sca.invocation.Message;
 import org.apache.tuscany.sca.runtime.RuntimeComponent;
 import org.oasisopen.sca.ResponseDispatch;
@@ -39,7 +42,7 @@ import org.oasisopen.sca.ServiceRuntimeE
  * implementation instance
  * 
  */
-public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker {
+public class JavaAsyncImplementationInvoker extends JavaImplementationInvoker implements InterceptorAsync {
 	
     public JavaAsyncImplementationInvoker(Operation operation, Method method, RuntimeComponent component, 
     		InterfaceContract interfaceContract) {
@@ -53,16 +56,13 @@ public class JavaAsyncImplementationInvo
         
         Object payload = msg.getBody();
 
-        Object contextId = null;
-
-        // store the current thread context classloader
-        // - replace it with the class loader used to load the java class as per SCA Spec
+        // Save the current thread context classloader
         ClassLoader tccl = Thread.currentThread().getContextClassLoader();
         
         try {
             // The following call might create a new conversation, as a result, the msg.getConversationID() might 
             // return a new value
-            InstanceWrapper wrapper = scopeContainer.getWrapper(contextId);
+            InstanceWrapper wrapper = scopeContainer.getWrapper(null);
 
             Object instance = wrapper.getInstance();
            
@@ -89,12 +89,8 @@ public class JavaAsyncImplementationInvo
             
             ret = method.invoke(instance, (Object[])payload2);
             
-            //ret = ((ResponseDispatchImpl<?>)dispatch).get(50, TimeUnit.SECONDS);
             throw new InvocationTargetException( new AsyncResponseException("AsyncResponse") );
 
-            //scopeContainer.returnWrapper(wrapper, contextId);
-            
-            //msg.setBody(ret);
         } catch (InvocationTargetException e) {
             Throwable cause = e.getTargetException();
             boolean isChecked = false;
@@ -104,7 +100,7 @@ public class JavaAsyncImplementationInvo
                     msg.setFaultBody(cause);
                     break;
                 }
-            } 
+            } // end for
             
             if (!isChecked) {
                 if (cause instanceof RuntimeException) {
@@ -115,7 +111,7 @@ public class JavaAsyncImplementationInvo
                 } else {
                     throw new ServiceRuntimeException(cause.getMessage(), cause);
                 }
-            }            
+            }  // end if          
                 
         } catch (ObjectCreationException e) {
             throw new ServiceRuntimeException(e.getMessage(), e);
@@ -128,4 +124,110 @@ public class JavaAsyncImplementationInvo
         return msg;
     } // end method invoke
 
-}
+    protected Invoker next;
+    protected InvokerAsyncResponse previous;
+	
+	public void setNext(Invoker next) {
+		this.next = next;
+	}
+
+	public Invoker getNext() {
+		return next;
+	}
+
+	public void invokeAsyncRequest(Message msg) throws Throwable {
+		processRequest(msg);
+	} // end method invokeAsyncRequest
+
+	public void invokeAsyncResponse(Message msg) {
+		msg = processResponse(msg);
+        InvokerAsyncResponse thePrevious = (InvokerAsyncResponse)getPrevious();
+        if (thePrevious != null ) thePrevious.invokeAsyncResponse(msg);
+	} // end method invokeAsyncResponse
+
+	public void setPrevious(InvokerAsyncResponse previous) {
+		this.previous = previous;
+	}
+
+	public InvokerAsyncResponse getPrevious() {
+		return previous;
+	}
+
+	public Message processRequest(Message msg) {
+        Operation op 	= this.operation;
+        Object payload 	= msg.getBody();
+
+        // Replace TCCL with the class loader used to load the java class as per SCA Spec
+        ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+        
+        try {
+            InstanceWrapper wrapper = scopeContainer.getWrapper(null);
+            Object instance = wrapper.getInstance();
+           
+            // Set the TCCL to the classloader used to load the implementation class
+            Thread.currentThread().setContextClassLoader(instance.getClass().getClassLoader());
+            
+            // For an async server method, there is an extra input parameter, which is a DispatchResponse instance 
+            // which is typed by the type of the response
+            Class<?> responseType = op.getOutputType().getPhysical();
+            ResponseDispatch<?> dispatch = ResponseDispatchImpl.newInstance(responseType, msg );
+            
+            Object[] payload2;
+            if (payload != null && !payload.getClass().isArray()) {
+            	payload2 = new Object[2];
+            	payload2[0] = payload;
+            } else {
+            	payload2 = new Object[ ((Object[])payload).length + 1 ];
+            	for( int i = 0; i < ((Object[])payload).length; i++) {
+            		payload2[i] = ((Object[])payload)[i];
+            	} // end for
+            }
+            payload2[ payload2.length - 1 ] = dispatch;
+            
+            method.invoke(instance, (Object[])payload2);
+            
+        } catch (InvocationTargetException e) {
+            Throwable cause = e.getTargetException();
+            boolean isChecked = false;
+            for (DataType<?> d : operation.getFaultTypes()) {
+                if (d.getPhysical().isInstance(cause)) {
+                    isChecked = true;
+                    // Ignore these errors since they should be returned asynchronously
+                    break;
+                }
+            } // end for
+            
+            if (!isChecked) {
+                if (cause instanceof RuntimeException) {
+                    throw (RuntimeException)cause;
+                } // end if
+                if (cause instanceof Error) {
+                    throw (Error)cause;
+                } else {
+                    throw new ServiceRuntimeException(cause.getMessage(), cause);
+                } // end if
+            } // end if           
+                
+        } catch (Exception e) {
+            throw new ServiceRuntimeException(e.getMessage(), e);        
+        } finally {
+            // set the tccl 
+            Thread.currentThread().setContextClassLoader(tccl);
+        }
+        return msg;
+	} // end method processRequest
+
+	public Message postProcessRequest(Message msg) {
+		return msg;
+	}
+
+	public Message postProcessRequest(Message msg, Throwable e)
+			throws Throwable {
+		throw e;
+	}
+
+	public Message processResponse(Message msg) {
+		return msg;
+	} // end method processResponse
+
+} // end class JavaAsyncImplementationInvoker

Added: tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncResponseInvokerImpl.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncResponseInvokerImpl.java?rev=1057654&view=auto
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncResponseInvokerImpl.java (added)
+++ tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncResponseInvokerImpl.java Tue Jan 11 14:21:27 2011
@@ -0,0 +1,95 @@
+package org.apache.tuscany.sca.implementation.java.invocation;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
+import org.apache.tuscany.sca.core.invocation.JDKAsyncResponseInvoker;
+import org.apache.tuscany.sca.invocation.Message;
+import org.oasisopen.sca.ServiceRuntimeException;
+
+/**
+ * Class which handles the asynchronous response message from an async service back to the client Java component
+ * 
+ * This class provides a registration function which permits the reference invoking code to register the Future
+ * which is used to return the response to the Java component code
+ */
+public class JavaAsyncResponseInvokerImpl implements JDKAsyncResponseInvoker {
+	
+    // Map used to link between async requests and async responses
+    private ConcurrentMap<String, Object> asyncMessageMap;
+
+	public JavaAsyncResponseInvokerImpl() {
+
+	    asyncMessageMap = new ConcurrentHashMap<String, Object>();
+	} // end constructor
+	
+	/**
+	 * Deal with the asynchronous response message
+	 * @param msg - the response message
+	 * 
+	 * The response message must contain a RELATES_TO id, which is used to identify the Future that represents
+	 * the operation yet to complete.  The Future is then completed with the content of the message.
+	 * The Future either calls back to the application code, or it is expected that the application is polling
+	 * the Future for its completion.
+	 */
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	public void invokeAsyncResponse(Message msg) {
+		// Obtain the Message ID for this message
+		String relatesID = getMessageRelatesID( msg );
+		if( relatesID == null ) 
+			throw new ServiceRuntimeException("JavaAsyncResponseInvoker - response message has no RELATES_TO id");
+		
+		// Look up the response object & remove it from the Map
+		Object responseHandler = asyncMessageMap.remove(relatesID);
+		
+		if( responseHandler == null ) 
+			throw new ServiceRuntimeException("JavaAsyncResponseInvoker - no Future matches the RELATES_TO id: " + relatesID);
+		
+		// Invoke the response handler with the content of the message 
+		// - in the case of a Java implementation, the response handler is a Future...
+		AsyncResponseHandler future = (AsyncResponseHandler) responseHandler;
+		
+		Object payload = msg.getBody();
+		Object response;
+		if( payload == null ) {
+			System.out.println("Returned response message was null");
+		} else {
+            if (payload.getClass().isArray()) {
+                response = ((Object[])payload)[0];
+            } else {
+                response = payload;
+            } // end if
+            if( response.getClass().equals(AsyncFaultWrapper.class)) {
+            	future.setWrappedFault((AsyncFaultWrapper) response );
+            } else if ( response instanceof Throwable ) {
+            	future.setFault( (Throwable)response );
+            } else {
+            	future.setResponse(response);
+            } // end if
+		} // end if
+
+	} // end method invokeAsyncResponse
+	
+	/**
+	 * Registers an Async response, which provides an ID which identifies a given response
+	 * and an object which can handle the response
+	 * @param id - the ID
+	 * @param responseHandler - the response handler object
+	 */
+	public void registerAsyncResponse( String id, Object responseHandler ) {
+		// Add the ID/response handler mapping into the table
+		if( id != null && responseHandler != null ) asyncMessageMap.put(id, responseHandler);
+	} // end method registerAsyncResponse
+	
+	/**
+	 * Extracts the RELATES_TO header from the message
+	 * @param msg - the Tuscany message
+	 * @return - the value of the RELATES_TO header as a String
+	 */
+	private String getMessageRelatesID( Message msg ) {
+		return (String)msg.getHeaders().get("RELATES_TO");
+	} // end method getMessageRelatesID
+
+} // end class JavaAsyncResponseInvoker

Propchange: tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncResponseInvokerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncResponseInvokerImpl.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaImplementationProvider.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaImplementationProvider.java?rev=1057654&r1=1057653&r2=1057654&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaImplementationProvider.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaImplementationProvider.java Tue Jan 11 14:21:27 2011
@@ -40,6 +40,9 @@ import org.apache.tuscany.sca.interfaced
 import org.apache.tuscany.sca.interfacedef.java.JavaInterface;
 import org.apache.tuscany.sca.interfacedef.java.impl.JavaInterfaceUtil;
 import org.apache.tuscany.sca.invocation.Invoker;
+import org.apache.tuscany.sca.invocation.InvokerAsyncRequest;
+import org.apache.tuscany.sca.invocation.InvokerAsyncResponse;
+import org.apache.tuscany.sca.provider.ImplementationAsyncProvider;
 import org.apache.tuscany.sca.runtime.RuntimeComponent;
 import org.apache.tuscany.sca.runtime.RuntimeComponentService;
 import org.oasisopen.sca.ComponentContext;
@@ -48,7 +51,7 @@ import org.oasisopen.sca.RequestContext;
 /**
  * @version $Rev$ $Date$
  */
-public class JavaImplementationProvider implements ScopedImplementationProvider {
+public class JavaImplementationProvider implements ScopedImplementationProvider, ImplementationAsyncProvider {
     private JavaImplementation implementation;
     private JavaComponentContextProvider componentContextProvider;
     private RequestContextFactory requestContextFactory;
@@ -166,4 +169,14 @@ public class JavaImplementationProvider 
         return implementation.isEagerInit();
     }
 
+	public InvokerAsyncRequest createAsyncInvoker(RuntimeComponentService service, Operation operation) {
+		// createInvoker should automatically create a JavaAsyncImplementationInvoker - if not, then it means 
+		// that the service is not async and the result will be an exception caused by the class cast.
+		return (InvokerAsyncRequest) createInvoker( service, operation );
+	} // end method createAsyncInvoker
+
+	public InvokerAsyncResponse createAsyncResponseInvoker(Operation operation) {
+		return new JavaAsyncResponseInvokerImpl();
+	} // end method createAsyncResponseInvoker
+
 }

Modified: tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java?rev=1057654&r1=1057653&r2=1057654&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java Tue Jan 11 14:21:27 2011
@@ -37,6 +37,7 @@ import org.apache.tuscany.sca.core.Facto
 import org.apache.tuscany.sca.core.factory.ObjectFactory;
 import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
 import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker;
 import org.apache.tuscany.sca.core.invocation.CallbackReferenceObjectFactory;
 import org.apache.tuscany.sca.core.invocation.ExtensibleProxyFactory;
 import org.apache.tuscany.sca.core.invocation.ProxyFactory;
@@ -45,6 +46,7 @@ import org.apache.tuscany.sca.invocation
 import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
 import org.oasisopen.sca.ResponseDispatch;
 import org.oasisopen.sca.ServiceReference;
+import org.oasisopen.sca.ServiceRuntimeException;
 
 /**
  * Implementation of the ResponseDispatch interface of the OASIS SCA Java API
@@ -80,12 +82,16 @@ public class ResponseDispatchImpl<T> imp
 	
 	// Service Reference used for the callback
 	private ServiceReference<AsyncResponseHandler<?>> callbackRef;
-	private String messageID;
+	private AsyncResponseInvoker<?> 	respInvoker;
+	private String 						messageID;
 	
 	public ResponseDispatchImpl( Message msg ) {
 		super();
 		callbackRef = getAsyncCallbackRef( msg );
 		
+    	respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER");
+    	//if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker");
+		
 		// TODO - why is WS stuff bleeding into general code?
     	messageID = (String) msg.getHeaders().get(MESSAGE_ID);
     	if (messageID == null){
@@ -122,10 +128,18 @@ public class ResponseDispatchImpl<T> imp
 		} else {
 			throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
 		} // end if
+		
+		// Use response invoker if present
+		if( respInvoker != null ) {
+			//respInvoker.invokeAsyncResponse(new AsyncFaultWrapper(e));
+			respInvoker.invokeAsyncResponse(e);
+			return;
+		} // end if
+		
 		// Now dispatch the response to the callback...
 		AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService();
 		setResponseHeaders();
-		handler.setFault(new AsyncFaultWrapper(e));
+		handler.setWrappedFault(new AsyncFaultWrapper(e));
 	} // end method sendFault
 
 	/**
@@ -145,6 +159,13 @@ public class ResponseDispatchImpl<T> imp
 		} else {
 			throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
 		} // end if
+		
+		// Use response invoker if present
+		if( respInvoker != null ) {
+			respInvoker.invokeAsyncResponse(res);
+			return;
+		} // end if
+		
 		// Now dispatch the response to the callback...
 		AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService();
 		setResponseHeaders();