You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by ed...@apache.org on 2010/07/11 12:10:43 UTC
svn commit: r963037 - in
/tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation:
JavaAsyncImplementationInvoker.java ResponseDispatchImpl.java
Author: edwardsmj
Date: Sun Jul 11 10:10:42 2010
New Revision: 963037
URL: http://svn.apache.org/viewvc?rev=963037&view=rev
Log:
Changes and additions to Java invoker in support of Client-side and Server-side asynchronous services and @asyncInvocation as described in TUSCANY-3608, 3611 & 3612
Modified:
tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
Modified: tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java?rev=963037&r1=963036&r2=963037&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/JavaAsyncImplementationInvoker.java Sun Jul 11 10:10:42 2010
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.tuscany.sca.core.factory.InstanceWrapper;
import org.apache.tuscany.sca.core.factory.ObjectCreationException;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseException;
import org.apache.tuscany.sca.interfacedef.DataType;
import org.apache.tuscany.sca.interfacedef.Operation;
import org.apache.tuscany.sca.interfacedef.java.JavaOperation;
@@ -70,7 +71,7 @@ public class JavaAsyncImplementationInvo
// For an async server method, there is an extra input parameter, which is a DispatchResponse instance
// which is typed by the type of the response
Class<?> responseType = op.getOutputType().getPhysical();
- ResponseDispatch<?> dispatch = ResponseDispatchImpl.newInstance(responseType);
+ ResponseDispatch<?> dispatch = ResponseDispatchImpl.newInstance(responseType, msg );
Object ret;
Object[] payload2;
@@ -87,15 +88,12 @@ public class JavaAsyncImplementationInvo
ret = method.invoke(instance, (Object[])payload2);
- try {
- ret = ((ResponseDispatchImpl<?>)dispatch).get(50, TimeUnit.SECONDS);
- } catch (Throwable t) {
- throw new InvocationTargetException(t);
- } // end try
-
- scopeContainer.returnWrapper(wrapper, contextId);
+ //ret = ((ResponseDispatchImpl<?>)dispatch).get(50, TimeUnit.SECONDS);
+ throw new InvocationTargetException( new AsyncResponseException("AsyncResponse") );
+
+ //scopeContainer.returnWrapper(wrapper, contextId);
- msg.setBody(ret);
+ //msg.setBody(ret);
} catch (InvocationTargetException e) {
Throwable cause = e.getTargetException();
boolean isChecked = false;
Modified: tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java?rev=963037&r1=963036&r2=963037&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java Sun Jul 11 10:10:42 2010
@@ -20,6 +20,8 @@
package org.apache.tuscany.sca.implementation.java.invocation;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,7 +29,22 @@ import java.util.concurrent.locks.Condit
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.tuscany.sca.assembly.EndpointReference;
+import org.apache.tuscany.sca.context.CompositeContext;
+import org.apache.tuscany.sca.context.ThreadMessageContext;
+import org.apache.tuscany.sca.core.ExtensionPointRegistry;
+import org.apache.tuscany.sca.core.FactoryExtensionPoint;
+import org.apache.tuscany.sca.core.factory.ObjectFactory;
+import org.apache.tuscany.sca.core.invocation.AsyncFaultWrapper;
+import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
+import org.apache.tuscany.sca.core.invocation.CallbackReferenceObjectFactory;
+import org.apache.tuscany.sca.core.invocation.ExtensibleProxyFactory;
+import org.apache.tuscany.sca.core.invocation.ProxyFactory;
+import org.apache.tuscany.sca.invocation.Message;
+import org.apache.tuscany.sca.invocation.MessageFactory;
+import org.apache.tuscany.sca.runtime.RuntimeEndpointReference;
import org.oasisopen.sca.ResponseDispatch;
+import org.oasisopen.sca.ServiceReference;
/**
* Implementation of the ResponseDispatch interface of the OASIS SCA Java API
@@ -45,6 +62,7 @@ public class ResponseDispatchImpl<T> imp
* Generated serialVersionUID value
*/
private static final long serialVersionUID = 300158355992568592L;
+ private static String WS_MESSAGE_ID = "WS_MESSAGE_ID";
// A latch used to ensure that the sendResponse() and sendFault() operations are used at most once
// The latch is initialized with the value "false"
@@ -57,12 +75,24 @@ public class ResponseDispatchImpl<T> imp
private volatile T response = null;
private volatile Throwable fault = null;
- public ResponseDispatchImpl( ) {
+ private ExtensionPointRegistry registry;
+
+ // Service Reference used for the callback
+ private ServiceReference<AsyncResponseHandler<?>> callbackRef;
+ private String callbackAddress;
+ private String messageID;
+
+ public ResponseDispatchImpl( Message msg ) {
super();
+ callbackRef = getAsyncCallbackRef( msg );
+
+ callbackAddress = msg.getFrom().getCallbackEndpoint().getURI();
+ messageID = (String) msg.getHeaders().get(WS_MESSAGE_ID);
+
} // end constructor
- public static <T> ResponseDispatchImpl<T> newInstance( Class<T> type ) {
- return new ResponseDispatchImpl<T>();
+ public static <T> ResponseDispatchImpl<T> newInstance( Class<T> type, Message msg ) {
+ return new ResponseDispatchImpl<T>( msg );
}
/**
@@ -89,6 +119,10 @@ public class ResponseDispatchImpl<T> imp
} else {
throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
} // end if
+ // Now dispatch the response to the callback...
+ AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService();
+ setResponseHeaders();
+ handler.setFault(new AsyncFaultWrapper(e));
} // end method sendFault
/**
@@ -108,6 +142,10 @@ public class ResponseDispatchImpl<T> imp
} else {
throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
} // end if
+ // Now dispatch the response to the callback...
+ AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService();
+ setResponseHeaders();
+ handler.setResponse(res);
} // end method sendResponse
public T get(long timeout, TimeUnit unit) throws Throwable {
@@ -133,4 +171,47 @@ public class ResponseDispatchImpl<T> imp
private boolean sendOK() {
return latch.compareAndSet(false, true);
}
+
+ /**
+ * Creates a service reference for the async callback, based on information contained in the supplied message
+ * @param msg - the incoming message
+ * @return - a CallBackServiceReference
+ */
+ @SuppressWarnings("unchecked")
+ private ServiceReference<AsyncResponseHandler<?>> getAsyncCallbackRef( Message msg ) {
+ RuntimeEndpointReference callbackEPR = (RuntimeEndpointReference) msg.getHeaders().get("ASYNC_CALLBACK");
+ if( callbackEPR == null ) return null;
+
+ CompositeContext compositeContext = callbackEPR.getCompositeContext();
+ registry = compositeContext.getExtensionPointRegistry();
+ ProxyFactory proxyFactory = ExtensibleProxyFactory.getInstance(registry);
+ List<EndpointReference> eprList = new ArrayList<EndpointReference>();
+ eprList.add(callbackEPR);
+ ObjectFactory<?> factory = new CallbackReferenceObjectFactory(AsyncResponseHandler.class, proxyFactory, eprList);
+
+ return (ServiceReference<AsyncResponseHandler<?>>) factory.getInstance();
+
+ } // end method getAsyncCallbackEPR
+
+ /**
+ * Sets the values of various headers in the response message
+ */
+ private void setResponseHeaders() {
+ // Is there an existing message context?
+ Message msgContext = ThreadMessageContext.getMessageContext();
+ if( msgContext == null ) {
+ // Create a message context
+ msgContext = getMessageFactory().createMessage();
+ } // end if
+
+ // Add in the header for the RelatesTo Message ID
+ msgContext.getHeaders().put(WS_MESSAGE_ID, messageID);
+
+ ThreadMessageContext.setMessageContext(msgContext);
+ } // end method setResponseHeaders
+
+ private MessageFactory getMessageFactory() {
+ FactoryExtensionPoint modelFactories = registry.getExtensionPoint(FactoryExtensionPoint.class);
+ return modelFactories.getFactory(MessageFactory.class);
+ } // end method getMessageFactory
}