You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by ed...@apache.org on 2011/01/20 17:56:45 UTC

svn commit: r1061391 - /tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java

Author: edwardsmj
Date: Thu Jan 20 16:56:44 2011
New Revision: 1061391

URL: http://svn.apache.org/viewvc?rev=1061391&view=rev
Log:
Stage 1 of Axis2 binding-ws support of async callbacks, as described in TUSCANY-3821

Modified:
    tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java

Modified: tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java
URL: http://svn.apache.org/viewvc/tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java?rev=1061391&r1=1061390&r2=1061391&view=diff
==============================================================================
--- tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java (original)
+++ tuscany/sca-java-2.x/trunk/modules/implementation-java-runtime/src/main/java/org/apache/tuscany/sca/implementation/java/invocation/ResponseDispatchImpl.java Thu Jan 20 16:56:44 2011
@@ -39,6 +39,7 @@ import org.apache.tuscany.sca.core.invoc
 import org.apache.tuscany.sca.core.invocation.AsyncResponseHandler;
 import org.apache.tuscany.sca.core.invocation.AsyncResponseInvoker;
 import org.apache.tuscany.sca.core.invocation.CallbackReferenceObjectFactory;
+import org.apache.tuscany.sca.core.invocation.Constants;
 import org.apache.tuscany.sca.core.invocation.ExtensibleProxyFactory;
 import org.apache.tuscany.sca.core.invocation.ProxyFactory;
 import org.apache.tuscany.sca.invocation.Message;
@@ -64,42 +65,47 @@ public class ResponseDispatchImpl<T> imp
 	 * Generated serialVersionUID value
 	 */
 	private static final long serialVersionUID = 300158355992568592L;
-    private static String WS_MESSAGE_ID = "WS_MESSAGE_ID";
-    private static String MESSAGE_ID = "MESSAGE_ID";
 	
 	// A latch used to ensure that the sendResponse() and sendFault() operations are used at most once
 	// The latch is initialized with the value "false"
-	private transient AtomicBoolean latch = new AtomicBoolean();
+	private AtomicBoolean latch = new AtomicBoolean();
 	
-	private transient final Lock lock = new ReentrantLock();
-    private transient final Condition completed  = lock.newCondition(); 
+	private final Lock lock = new ReentrantLock();
+    private final Condition completed  = lock.newCondition(); 
 	
 	// The result
 	private transient volatile T response = null;
 	private transient volatile Throwable fault = null; 
 	
-	private ExtensionPointRegistry registry;
+	private transient ExtensionPointRegistry registry;
+	private MessageFactory msgFactory;
 	
 	// Service Reference used for the callback
-	private ServiceReference<AsyncResponseHandler<?>> callbackRef;
+	private volatile ServiceReference<AsyncResponseHandler<?>> callbackRef;
 	private AsyncResponseInvoker<?> 	respInvoker;
 	private String 						messageID;
 	
+	/**
+	 * No-arg constructor for serialization purposes
+	 */
+	public ResponseDispatchImpl() {
+		super();
+	} // end constructor
+	
 	public ResponseDispatchImpl( Message msg ) {
 		super();
-		callbackRef = getAsyncCallbackRef( msg );
 		
-    	respInvoker = (AsyncResponseInvoker)msg.getHeaders().get("ASYNC_RESPONSE_INVOKER");
+    	respInvoker = (AsyncResponseInvoker<?>)msg.getHeaders().get(Constants.ASYNC_RESPONSE_INVOKER);
     	//if( respInvoker == null ) throw new ServiceRuntimeException("Async Implementation invoked with no response invoker");
+    	
+    	if( respInvoker == null ) {
+    		callbackRef = getAsyncCallbackRef( msg );
+    	} // end if 
 		
-		// TODO - why is WS stuff bleeding into general code?
-    	messageID = (String) msg.getHeaders().get(MESSAGE_ID);
-    	if (messageID == null){
-    	    messageID = (String) msg.getHeaders().get(WS_MESSAGE_ID);
-    	}
+		messageID = (String) msg.getHeaders().get(Constants.MESSAGE_ID);
     	
 	} // end constructor
-	
+
 	public static <T> ResponseDispatchImpl<T> newInstance( Class<T> type, Message msg ) {
 		return new ResponseDispatchImpl<T>( msg );
 	}
@@ -160,16 +166,18 @@ public class ResponseDispatchImpl<T> imp
 			throw new IllegalStateException("sendResponse() or sendFault() has been called previously");
 		} // end if
 		
+		// Now dispatch the response to the callback, if present...
+		if( callbackRef != null ) {
+			AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService();
+			setResponseHeaders();
+			handler.setResponse(res);
+		} // end if
+		
 		// Use response invoker if present
 		if( respInvoker != null ) {
 			respInvoker.invokeAsyncResponse(res);
 			return;
 		} // end if
-		
-		// Now dispatch the response to the callback...
-		AsyncResponseHandler<T> handler = (AsyncResponseHandler<T>) callbackRef.getService();
-		setResponseHeaders();
-		handler.setResponse(res);
 	} // end method sendResponse
 	
 	public T get(long timeout, TimeUnit unit) throws Throwable {
@@ -203,12 +211,13 @@ public class ResponseDispatchImpl<T> imp
 	 */
 	@SuppressWarnings("unchecked")
 	private ServiceReference<AsyncResponseHandler<?>> getAsyncCallbackRef( Message msg ) { 
-    	RuntimeEndpointReference callbackEPR = (RuntimeEndpointReference) msg.getHeaders().get("ASYNC_CALLBACK");
+    	RuntimeEndpointReference callbackEPR = (RuntimeEndpointReference) msg.getHeaders().get(Constants.ASYNC_CALLBACK);
     	if( callbackEPR == null ) return null;
     	
     	CompositeContext compositeContext = callbackEPR.getCompositeContext();
         registry = compositeContext.getExtensionPointRegistry();
     	ProxyFactory proxyFactory = ExtensibleProxyFactory.getInstance(registry);
+    	msgFactory = getMessageFactory();
     	List<EndpointReference> eprList = new ArrayList<EndpointReference>();
     	eprList.add(callbackEPR);
     	ObjectFactory<?> factory = new CallbackReferenceObjectFactory(AsyncResponseHandler.class, proxyFactory, eprList);
@@ -225,12 +234,11 @@ public class ResponseDispatchImpl<T> imp
 		Message msgContext = ThreadMessageContext.getMessageContext();
 		if( msgContext == null ) {
 			// Create a message context
-			msgContext = getMessageFactory().createMessage();
+			msgContext = msgFactory.createMessage();
 		} // end if
 		
 		// Add in the header for the RelatesTo Message ID
-		msgContext.getHeaders().put(WS_MESSAGE_ID, messageID);
-		msgContext.getHeaders().put(MESSAGE_ID, messageID);
+		msgContext.getHeaders().put(Constants.RELATES_TO, messageID);
 		
 		ThreadMessageContext.setMessageContext(msgContext);
 	} // end method setResponseHeaders