You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by eg...@apache.org on 2007/02/07 12:19:21 UTC

svn commit: r504504 - in /incubator/cxf/trunk/rt/transports: http/src/main/java/org/apache/cxf/transport/http/ http/src/test/java/org/apache/cxf/transport/http/ http2/src/main/java/org/apache/cxf/transport/http/ http2/src/test/java/org/apache/cxf/trans...

Author: eglynn
Date: Wed Feb  7 03:19:20 2007
New Revision: 504504

URL: http://svn.apache.org/viewvc?view=rev&rev=504504
Log:
Handle incoming decoupled response via an interposed MessageObserver on a HTTP Destination, so as to remove HTTPConduit dependency on Jetty.

Modified:
    incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
    incubator/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
    incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
    incubator/cxf/trunk/rt/transports/http2/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java

Modified: incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?view=diff&rev=504504&r1=504503&r2=504504
==============================================================================
--- incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original)
+++ incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Wed Feb  7 03:19:20 2007
@@ -19,7 +19,6 @@
 
 package org.apache.cxf.transport.http;
 
-import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -51,16 +50,14 @@
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.AbstractConduit;
-import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.transport.DestinationFactoryManager;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.wsdl.EndpointReferenceUtils;
-import org.mortbay.http.HttpRequest;
-import org.mortbay.http.HttpResponse;
-import org.mortbay.http.handler.AbstractHttpHandler;
 
 import static org.apache.cxf.message.Message.DECOUPLED_CHANNEL_MESSAGE;
 
@@ -77,9 +74,9 @@
     private URLConnectionFactory connectionFactory;
     private URL url;
     
-    private ServerEngine decoupledEngine;
-    private URL decoupledURL;
-    private DecoupledDestination decoupledDestination;
+    private Destination decoupledDestination;
+    private MessageObserver decoupledObserver;
+    private int decoupledDestinationRefCount;
     private EndpointInfo endpointInfo;
     
     // COnfiguration values
@@ -114,7 +111,6 @@
         this(b,
              ei,
              t,
-             null,
              null);
     }    
 
@@ -126,14 +122,12 @@
      * @param ei the endpoint info of the initiator
      * @param t the endpoint reference of the target
      * @param factory the URL connection factory
-     * @param eng the decoupled engine
      * @throws IOException
      */
     public HTTPConduit(Bus b,
                        EndpointInfo ei,
                        EndpointReferenceType t,
-                       URLConnectionFactory factory,
-                       ServerEngine eng) throws IOException {
+                       URLConnectionFactory factory) throws IOException {
         super(getTargetReference(ei, t));
         bus = b;
         endpointInfo = ei;
@@ -141,7 +135,6 @@
 
         initConfig();
         
-        decoupledEngine = eng;
         url = t == null
               ? new URL(endpointInfo.getAddress())
               : new URL(t.getAddress().getValue());
@@ -231,7 +224,7 @@
     public synchronized Destination getBackChannel() {
         if (decoupledDestination == null
             && getClient().getDecoupledEndpoint() != null) {
-            decoupledDestination = setUpDecoupledDestination(); 
+            setUpDecoupledDestination(); 
         }
         return decoupledDestination;
     }
@@ -255,12 +248,8 @@
         // in decoupled case, close response Destination if reference count
         // hits zero
         //
-        if (decoupledURL != null && decoupledEngine != null) {
-            DecoupledHandler decoupledHandler = 
-                (DecoupledHandler)decoupledEngine.getServant(decoupledURL);
-            if (decoupledHandler != null) {
-                decoupledHandler.release();
-            }
+        if (decoupledDestination != null) {
+            releaseDecoupledDestination();
         }
     }
 
@@ -358,10 +347,8 @@
 
     /**
      * Set up the decoupled Destination if necessary.
-     * 
-     * @return an appropriate decoupled Destination
      */
-    private DecoupledDestination setUpDecoupledDestination() {        
+    private void setUpDecoupledDestination() {        
         EndpointReferenceType reference =
             EndpointReferenceUtils.getEndpointReference(
                 getClient().getDecoupledEndpoint());
@@ -369,102 +356,50 @@
             String decoupledAddress = reference.getAddress().getValue();
             LOG.info("creating decoupled endpoint: " + decoupledAddress);
             try {
-                decoupledURL = new URL(decoupledAddress);
-                if (decoupledEngine == null) {
-                    decoupledEngine = 
-                        JettyHTTPServerEngine.getForPort(bus, 
-                                                         decoupledURL.getProtocol(),
-                                                         decoupledURL.getPort());
-                }
-                DecoupledHandler decoupledHandler =
-                    (DecoupledHandler)decoupledEngine.getServant(decoupledURL);
-                if (decoupledHandler == null) {
-                    decoupledHandler = new DecoupledHandler();
-                    decoupledEngine.addServant(decoupledURL, decoupledHandler);
-                } 
-                decoupledHandler.duplicate();
+                decoupledDestination = getDestination(decoupledAddress);
+                duplicateDecoupledDestination();
             } catch (Exception e) {
                 // REVISIT move message to localizable Messages.properties
                 LOG.log(Level.WARNING, "decoupled endpoint creation failed: ", e);
             }
         }
-        return new DecoupledDestination(reference, incomingObserver);
     }
-    
+
     /**
-     * Wrapper output stream responsible for flushing headers and handling
-     * the incoming HTTP-level response (not necessarily the MEP response).
+     * @param address the address
+     * @return a Destination for the address
      */
-    private class WrappedOutputStream extends AbstractWrappedOutputStream {
-        protected URLConnection connection;
-        
-        WrappedOutputStream(Message m, URLConnection c) {
-            super(m);
-            connection = c;
-        }
-
-        /**
-         * Perform any actions required on stream flush (freeze headers,
-         * reset output stream ... etc.)
-         */
-        protected void doFlush() throws IOException {
-            if (!alreadyFlushed()) {                
-                flushHeaders(outMessage);
-                if (connection instanceof HttpURLConnection) {            
-                    HttpURLConnection hc = (HttpURLConnection)connection;                    
-                    if (hc.getRequestMethod().equals("GET")) {
-                        return;
-                    }
-                }
-                resetOut(connection.getOutputStream(), true);
-            }
-        }
-
-        /**
-         * Perform any actions required on stream closure (handle response etc.)
-         */
-        protected void doClose() throws IOException {
-            handleResponse();
+    private Destination getDestination(String address) throws IOException {
+        Destination destination = null;
+        DestinationFactoryManager factoryManager =
+            bus.getExtension(DestinationFactoryManager.class);
+        DestinationFactory factory =
+            factoryManager.getDestinationFactoryForUri(address);
+        if (factory != null) {
+            EndpointInfo ei = new EndpointInfo();
+            ei.setAddress(address);
+            destination = factory.getDestination(ei);
+            decoupledObserver = new InterposedMessageObserver();
+            destination.setMessageObserver(decoupledObserver);
         }
-        
-        protected void onWrite() throws IOException {
-            
-        }
-
-        private void handleResponse() throws IOException {
-            Exchange exchange = outMessage.getExchange();
-            int responseCode = getResponseCode(connection);
-            if (isOneway(exchange)
-                && !isPartialResponse(connection, responseCode)) {
-                // oneway operation without partial response
-                connection.getInputStream().close();
-                return;
-            }
-            
-            Message inMessage = new MessageImpl();
-            inMessage.setExchange(exchange);
-            InputStream in = null;
-            Map<String, List<String>> headers = new HashMap<String, List<String>>();
-            for (String key : connection.getHeaderFields().keySet()) {
-                headers.put(HttpHeaderHelper.getHeaderKey(key), connection.getHeaderFields().get(key));
-            }
-            inMessage.put(Message.PROTOCOL_HEADERS, headers);
-            inMessage.put(Message.RESPONSE_CODE, responseCode);
-            inMessage.put(Message.CONTENT_TYPE, connection.getHeaderField(HttpHeaderHelper.CONTENT_TYPE));
-
-            if (connection instanceof HttpURLConnection) {
-                HttpURLConnection hc = (HttpURLConnection)connection;
-                in = hc.getErrorStream();
-                if (null == in) {
-                    in = connection.getInputStream();
-                }
-            } else {
-                in = connection.getInputStream();
-            }
-            
-            inMessage.setContent(InputStream.class, in);
-            
-            incomingObserver.onMessage(inMessage);
+        return destination;
+    }
+    
+    /**
+     * @return the decoupled observer
+     */
+    protected MessageObserver getDecoupledObserver() {
+        return decoupledObserver;
+    }
+    
+    private synchronized void duplicateDecoupledDestination() {
+        decoupledDestinationRefCount++;
+    }
+    
+    private synchronized void releaseDecoupledDestination() {
+        if (--decoupledDestinationRefCount == 0) {
+            LOG.log(Level.INFO, "shutting down decoupled destination");
+            decoupledDestination.shutdown();
         }
     }
     
@@ -487,113 +422,6 @@
                && connection.getContentLength() != 0;
     }
 
-    /**
-     * Wrapper output stream responsible for commiting incoming request 
-     * containing a decoupled response.
-     */
-    private class WrapperInputStream extends FilterInputStream {
-        HttpRequest request;
-        HttpResponse response;
-        boolean closed;
-        
-        WrapperInputStream(InputStream is,
-                           HttpRequest req,
-                           HttpResponse resp) {
-            super(is);
-            request = req;
-            response = resp;
-        }
-        
-        public void close() throws IOException {
-            if (!closed) {
-                closed = true;
-                response.commit();
-                request.setHandled(true);
-            }
-        }
-    }
-    
-    /**
-     * Represented decoupled response endpoint.
-     */
-    protected class DecoupledDestination implements Destination {
-        protected MessageObserver decoupledMessageObserver;
-        private EndpointReferenceType address;
-        
-        DecoupledDestination(EndpointReferenceType ref,
-                             MessageObserver incomingObserver) {
-            address = ref;
-            decoupledMessageObserver = incomingObserver;
-        }
-
-        public EndpointReferenceType getAddress() {
-            return address;
-        }
-
-        public Conduit getBackChannel(Message inMessage,
-                                      Message partialResponse,
-                                      EndpointReferenceType addr)
-            throws IOException {
-            // shouldn't be called on decoupled endpoint
-            return null;
-        }
-
-        public void shutdown() {
-            // TODO Auto-generated method stub            
-        }
-
-        public synchronized void setMessageObserver(MessageObserver observer) {
-            decoupledMessageObserver = observer;
-        }
-        
-        protected synchronized MessageObserver getMessageObserver() {
-            return decoupledMessageObserver;
-        }
-    }
-
-    /**
-     * Handles incoming decoupled responses.
-     */
-    private class DecoupledHandler extends AbstractHttpHandler {
-        private int refCount;
-                
-        synchronized void duplicate() {
-            refCount++;
-        }
-        
-        synchronized void release() {
-            if (--refCount == 0) {
-                decoupledEngine.removeServant(decoupledURL);
-                JettyHTTPServerEngine.destroyForPort(decoupledURL.getPort());
-            }
-        }
-        
-        public void handle(String pathInContext, 
-                           String pathParams,
-                           HttpRequest req,
-                           HttpResponse resp) throws IOException {
-            InputStream responseStream = req.getInputStream();
-            Message inMessage = new MessageImpl();
-            // disposable exchange, swapped with real Exchange on correlation
-            inMessage.setExchange(new ExchangeImpl());
-            inMessage.put(DECOUPLED_CHANNEL_MESSAGE, Boolean.TRUE);
-            // REVISIT: how to get response headers?
-            //inMessage.put(Message.PROTOCOL_HEADERS, req.getXXX());
-            setHeaders(inMessage);
-            inMessage.put(Message.ENCODING, resp.getCharacterEncoding());
-            inMessage.put(Message.CONTENT_TYPE, resp.getContentType());
-            inMessage.put(Message.RESPONSE_CODE, HttpURLConnection.HTTP_OK);
-            InputStream is = new WrapperInputStream(responseStream, req, resp);
-            inMessage.setContent(InputStream.class, is);
-
-            try {
-                decoupledDestination.getMessageObserver().onMessage(inMessage);    
-            } finally {
-                is.close();
-            }
-        }
-    }    
-
     private void initConfig() {
         // Initialize some default values for the configuration
         client = endpointInfo.getTraversedExtensor(new HTTPClientPolicy(), HTTPClientPolicy.class);
@@ -749,5 +577,104 @@
         this.sslClient = sslClient;
     }  
     
-    
+    /**
+     * Wrapper output stream responsible for flushing headers and handling
+     * the incoming HTTP-level response (not necessarily the MEP response).
+     */
+    private class WrappedOutputStream extends AbstractWrappedOutputStream {
+        protected URLConnection connection;
+        
+        WrappedOutputStream(Message m, URLConnection c) {
+            super(m);
+            connection = c;
+        }
+
+        /**
+         * Perform any actions required on stream flush (freeze headers,
+         * reset output stream ... etc.)
+         */
+        protected void doFlush() throws IOException {
+            if (!alreadyFlushed()) {                
+                flushHeaders(outMessage);
+                if (connection instanceof HttpURLConnection) {            
+                    HttpURLConnection hc = (HttpURLConnection)connection;                    
+                    if (hc.getRequestMethod().equals("GET")) {
+                        return;
+                    }
+                }
+                resetOut(connection.getOutputStream(), true);
+            }
+        }
+
+        /**
+         * Perform any actions required on stream closure (handle response etc.)
+         */
+        protected void doClose() throws IOException {
+            handleResponse();
+        }
+        
+        protected void onWrite() throws IOException {
+            
+        }
+
+        private void handleResponse() throws IOException {
+            Exchange exchange = outMessage.getExchange();
+            int responseCode = getResponseCode(connection);
+            if (isOneway(exchange)
+                && !isPartialResponse(connection, responseCode)) {
+                // oneway operation without partial response
+                connection.getInputStream().close();
+                return;
+            }
+            
+            Message inMessage = new MessageImpl();
+            inMessage.setExchange(exchange);
+            InputStream in = null;
+            Map<String, List<String>> headers = new HashMap<String, List<String>>();
+            for (String key : connection.getHeaderFields().keySet()) {
+                headers.put(HttpHeaderHelper.getHeaderKey(key), connection.getHeaderFields().get(key));
+            }
+            inMessage.put(Message.PROTOCOL_HEADERS, headers);
+            inMessage.put(Message.RESPONSE_CODE, responseCode);
+            inMessage.put(Message.CONTENT_TYPE, connection.getHeaderField(HttpHeaderHelper.CONTENT_TYPE));
+
+            if (connection instanceof HttpURLConnection) {
+                HttpURLConnection hc = (HttpURLConnection)connection;
+                in = hc.getErrorStream();
+                if (null == in) {
+                    in = connection.getInputStream();
+                }
+            } else {
+                in = connection.getInputStream();
+            }
+            
+            inMessage.setContent(InputStream.class, in);
+            
+            incomingObserver.onMessage(inMessage);
+        }
+    }
+       
+    /**
+     * Used to set appropriate message properties, exchange etc.
+     * as required for an incoming decoupled response (as opposed
+     * what's normally set by the Destination for an incoming
+     * request).
+     */
+    protected class InterposedMessageObserver implements MessageObserver {
+        /**
+         * Called for an incoming message.
+         * 
+         * @param inMessage
+         */
+        public void onMessage(Message inMessage) {
+            // disposable exchange, swapped with real Exchange on correlation
+            inMessage.setExchange(new ExchangeImpl());
+            inMessage.put(DECOUPLED_CHANNEL_MESSAGE, Boolean.TRUE);
+            // REVISIT: how to get response headers?
+            //inMessage.put(Message.PROTOCOL_HEADERS, req.getXXX());
+            setHeaders(inMessage);
+            inMessage.put(Message.RESPONSE_CODE, HttpURLConnection.HTTP_OK);
+            incomingObserver.onMessage(inMessage);
+        }
+    }
 }

Modified: incubator/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java?view=diff&rev=504504&r1=504503&r2=504504
==============================================================================
--- incubator/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java (original)
+++ incubator/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java Wed Feb  7 03:19:20 2007
@@ -40,16 +40,17 @@
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.transport.DestinationFactoryManager;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.wsdl.EndpointReferenceUtils;
 import org.easymock.classextension.EasyMock;
 import org.easymock.classextension.IMocksControl;
-import org.mortbay.http.HttpHandler;
-import org.mortbay.http.handler.AbstractHttpHandler;
-import org.mortbay.util.MultiMap;
+
+import static org.apache.cxf.message.Message.DECOUPLED_CHANNEL_MESSAGE;
+
 
 public class HTTPConduitTest extends TestCase {
     private static final String NOWHERE = "http://nada.nothing.nowhere.null/";
@@ -63,8 +64,6 @@
     private MessageObserver observer;
     private OutputStream os;
     private InputStream is;
-    private TestServerEngine decoupledEngine;
-    private MultiMap parameters;
     private IMocksControl control;
     
     public void setUp() throws Exception {
@@ -83,8 +82,6 @@
         observer = null;
         os = null;
         is = null;
-        parameters = null;
-        decoupledEngine = null;
     }
 
     public void testGetTarget() throws Exception {
@@ -198,24 +195,34 @@
                     ((HttpURLConnection)connection).setChunkedStreamingMode(2048);
                     EasyMock.expectLastCall();                    
                 }
-            }
-            
-            if (decoupled) {
-                decoupledEngine = new TestServerEngine();
-                parameters = control.createMock(MultiMap.class);
-            }            
-            
+            }         
+        }
+
+        CXFBusImpl bus = new CXFBusImpl();
+        URL decoupledURL = null;
+        if (decoupled) {
+            decoupledURL = new URL(NOWHERE + "response");
+            DestinationFactoryManager mgr =
+                control.createMock(DestinationFactoryManager.class);
+            DestinationFactory factory =
+                control.createMock(DestinationFactory.class);
+            Destination destination =
+                control.createMock(Destination.class);
+
+            bus.setExtension(mgr, DestinationFactoryManager.class);
+            mgr.getDestinationFactoryForUri(decoupledURL.toString());
+            EasyMock.expectLastCall().andReturn(factory);
+            factory.getDestination(EasyMock.isA(EndpointInfo.class));
+            EasyMock.expectLastCall().andReturn(destination);
+            destination.setMessageObserver(EasyMock.isA(HTTPConduit.InterposedMessageObserver.class));
         }
-               
         
         control.replay();
         
-        CXFBusImpl bus = new CXFBusImpl();
         HTTPConduit conduit = new HTTPConduit(bus, 
                                               endpointInfo,
                                               null,
-                                              connectionFactory,
-                                              decoupledEngine);
+                                              connectionFactory);
         conduit.retrieveConnectionFactory();
 
         if (send) {
@@ -230,13 +237,11 @@
         }
 
         if (decoupled) {
-            URL decoupledURL = null;
-            if (decoupled) {
-                decoupledURL = new URL(NOWHERE + "response");
-                conduit.getClient().setDecoupledEndpoint(decoupledURL.toString());
-            } 
+            conduit.getClient().setDecoupledEndpoint(decoupledURL.toString());
+            assertNotNull("expected back channel", conduit.getBackChannel());
+        } else {
+            assertNull("unexpected back channel", conduit.getBackChannel());
         }
-       
 
         observer = new MessageObserver() {
             public void onMessage(Message m) {
@@ -247,19 +252,19 @@
         return conduit;
     }
     
-    private void verifySentMessage(Conduit conduit, Message message)
+    private void verifySentMessage(HTTPConduit conduit, Message message)
         throws IOException {
         verifySentMessage(conduit, message, false);
     }
 
-    private void verifySentMessage(Conduit conduit,
+    private void verifySentMessage(HTTPConduit conduit,
                                    Message message,
                                    boolean expectHeaders)
         throws IOException {
         verifySentMessage(conduit, message, expectHeaders, false);
     }
     
-    private void verifySentMessage(Conduit conduit,
+    private void verifySentMessage(HTTPConduit conduit,
                                    Message message,
                                    boolean expectHeaders,
                                    boolean decoupled)
@@ -280,11 +285,6 @@
         EasyMock.expectLastCall().andReturn(os);
         os.write(PAYLOAD.getBytes(), 0, PAYLOAD.length());
         EasyMock.expectLastCall();
-
-        URL decoupledURL = null;
-        if (decoupled) {
-            decoupledURL = new URL(NOWHERE + "response");
-        } 
         
         os.flush();
         EasyMock.expectLastCall();
@@ -294,28 +294,9 @@
         EasyMock.expectLastCall();
         
         verifyHandleResponse(decoupled);
-        
+
         control.replay();
         
-        Destination backChannel = null;
-        AbstractHttpHandler decoupledHandler = null;
-        if (decoupled) {
-            decoupledEngine.verifyCallCounts(new int[]{0, 0, 0});
-            backChannel = conduit.getBackChannel();
-            assertNotNull("expected back channel", backChannel);
-            decoupledEngine.verifyCallCounts(new int[]{1, 0, 1});
-            decoupledHandler = decoupledEngine.servants.get(decoupledURL);
-            assertNotNull("expected servant registered", decoupledHandler);
-            MessageObserver decoupledObserver =
-                ((HTTPConduit.DecoupledDestination)backChannel).getMessageObserver();
-            assertSame("unexpected decoupled destination",
-                       observer,       
-                       decoupledObserver);
-        } else {
-            backChannel = conduit.getBackChannel();
-            assertNull("unexpected back channel", backChannel);
-        }
-        
         wrappedOS.flush();
         wrappedOS.flush();
         wrappedOS.close();
@@ -334,13 +315,10 @@
         assertSame("unexpected content", is, inMessage.getContent(InputStream.class));
         
         if (decoupled) {
-            verifyDecoupledResponse(decoupledHandler);
+            verifyDecoupledResponse(conduit);
         }
         
         conduit.close();
-        if (decoupled) {
-            decoupledEngine.verifyCallCounts(new int[]{1, 1, 2});
-        }
         
         finalVerify();
     }
@@ -393,45 +371,19 @@
         EasyMock.expectLastCall().andReturn(is);
     }
     
-    private void verifyDecoupledResponse(AbstractHttpHandler decoupledHandler)
+    private void verifyDecoupledResponse(HTTPConduit conduit)
         throws IOException {
-        inMessage = null;
-        is = EasyMock.createMock(InputStream.class);
-        os = EasyMock.createMock(OutputStream.class);
-        TestHttpRequest decoupledRequest = new TestHttpRequest(is, parameters);
-        TestHttpResponse decoupledResponse = new TestHttpResponse(os);
-        decoupledHandler.handle("pathInContext",
-                                "pathParams",
-                                decoupledRequest,
-                                decoupledResponse);
-        assertNotNull("expected decoupled in message", inMessage);
-        assertNotNull("expected response headers",
-                      inMessage.get(Message.PROTOCOL_HEADERS));
+        Message incoming = new MessageImpl();
+        conduit.getDecoupledObserver().onMessage(incoming);
+        assertSame("expected pass thru onMessage() notification",
+                   inMessage,
+                   incoming);
         assertEquals("unexpected response code",
                      HttpURLConnection.HTTP_OK,
                      inMessage.get(Message.RESPONSE_CODE));
-
-        assertEquals("unexpected getInputStream count",
-                     1,
-                     decoupledRequest.getInputStreamCallCount());
-        //assertEquals("unexpected getParameters counts",
-        //             1,
-        //             decoupledRequest.getParametersCallCount());
-        assertTrue("unexpected content formats",
-                   inMessage.getContentFormats().contains(InputStream.class));
-        InputStream decoupledIS = inMessage.getContent(InputStream.class);
-        assertNotNull("unexpected content", decoupledIS);
-        
-        decoupledIS.close();
-        assertEquals("unexpected setHandled count",
-                     1,
-                     decoupledRequest.getHandledCallCount());
-        assertEquals("unexpected setHandled count",
-                     1,
-                     decoupledResponse.getCommitCallCount());
-        
-        inMessage.setContent(InputStream.class, is);
-
+        assertEquals("expected DECOUPLED_CHANNEL_MESSAGE flag set",
+                     Boolean.TRUE,
+                     inMessage.get(DECOUPLED_CHANNEL_MESSAGE));
     }
 
     private void finalVerify() {
@@ -443,42 +395,5 @@
     
     static EndpointReferenceType getEPR(String s) {
         return EndpointReferenceUtils.getEndpointReference(NOWHERE + s);
-    }
-    
-    /**
-     * EasyMock does not seem able to properly mock calls to ServerEngine -
-     * expectations set seem to be ignored.
-     */
-    private class TestServerEngine implements ServerEngine {
-        private int callCounts[] = {0, 0, 0};
-        private Map<URL, AbstractHttpHandler> servants =
-            new HashMap<URL, AbstractHttpHandler>();
-        
-        public void addServant(URL url, AbstractHttpHandler handler) {
-            callCounts[0]++;
-            servants.put(url, handler);
-        }
-
-        public void removeServant(URL url) {
-            callCounts[1]++;
-            servants.remove(url);
-        }
-
-        public HttpHandler getServant(URL url) {
-            callCounts[2]++;
-            return servants.get(url);
-        }
-
-        void verifyCallCounts(int expectedCallCounts[]) {
-            assertEquals("unexpected addServant call count",
-                         expectedCallCounts[0],
-                         callCounts[0]);
-            assertEquals("unexpected removeServant call count",
-                         expectedCallCounts[1],
-                         callCounts[1]);
-            assertEquals("unexpected getServant call count",
-                         expectedCallCounts[2],
-                         callCounts[2]);
-        }
     }
 }

Modified: incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?view=diff&rev=504504&r1=504503&r2=504504
==============================================================================
--- incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original)
+++ incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Wed Feb  7 03:19:20 2007
@@ -19,7 +19,6 @@
 
 package org.apache.cxf.transport.http;
 
-import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -36,9 +35,6 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
 import org.apache.cxf.Bus;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.common.util.Base64Utility;
@@ -53,19 +49,19 @@
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.AbstractConduit;
-import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.transport.DestinationFactoryManager;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.http.conduit.HTTPConduitConfigBean;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.ws.addressing.AttributedURIType;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.wsdl.EndpointReferenceUtils;
-import org.mortbay.jetty.HttpConnection;
-import org.mortbay.jetty.Request;
-import org.mortbay.jetty.handler.AbstractHandler;
 
 import static org.apache.cxf.message.Message.DECOUPLED_CHANNEL_MESSAGE;
+
+
 /**
  * HTTP Conduit implementation.
  */
@@ -80,9 +76,9 @@
     private URLConnectionFactory connectionFactory;
     private URL url;
     
-    private ServerEngine decoupledEngine;
-    private URL decoupledURL;
-    private DecoupledDestination decoupledDestination;
+    private Destination decoupledDestination;
+    private MessageObserver decoupledObserver;
+    private int decoupledDestinationRefCount;
     private EndpointInfo endpointInfo;
     
 
@@ -111,7 +107,6 @@
         this(b,
              ei,
              t,
-             null,
              null);
     }    
 
@@ -123,14 +118,12 @@
      * @param ei the endpoint info of the initiator
      * @param t the endpoint reference of the target
      * @param factory the URL connection factory
-     * @param eng the decoupled engine
      * @throws IOException
      */
     public HTTPConduit(Bus b,
                        EndpointInfo ei,
                        EndpointReferenceType t,
-                       URLConnectionFactory factory,
-                       ServerEngine eng) throws IOException {
+                       URLConnectionFactory factory) throws IOException {
         super(getTargetReference(ei, t));
         bus = b;
         endpointInfo = ei;
@@ -138,7 +131,6 @@
 
         initConfig();
         
-        decoupledEngine = eng;
         url = t == null
               ? new URL(endpointInfo.getAddress())
               : new URL(t.getAddress().getValue());
@@ -231,7 +223,7 @@
     public synchronized Destination getBackChannel() {
         if (decoupledDestination == null
             &&  config.getClient().getDecoupledEndpoint() != null) {
-            decoupledDestination = setUpDecoupledDestination(); 
+            setUpDecoupledDestination(); 
         }
         return decoupledDestination;
     }
@@ -255,12 +247,8 @@
         // in decoupled case, close response Destination if reference count
         // hits zero
         //
-        if (decoupledURL != null && decoupledEngine != null) {
-            DecoupledHandler decoupledHandler = 
-                (DecoupledHandler)decoupledEngine.getServant(decoupledURL);
-            if (decoupledHandler != null) {
-                decoupledHandler.release();
-            }
+        if (decoupledDestination != null) {
+            releaseDecoupledDestination();
         }
     }
 
@@ -358,10 +346,8 @@
 
     /**
      * Set up the decoupled Destination if necessary.
-     * 
-     * @return an appropriate decoupled Destination
      */
-    private DecoupledDestination setUpDecoupledDestination() {        
+    private void setUpDecoupledDestination() {        
         EndpointReferenceType reference =
             EndpointReferenceUtils.getEndpointReference(
                 config.getClient().getDecoupledEndpoint());
@@ -369,104 +355,50 @@
             String decoupledAddress = reference.getAddress().getValue();
             LOG.info("creating decoupled endpoint: " + decoupledAddress);
             try {
-                decoupledURL = new URL(decoupledAddress);
-                if (decoupledEngine == null) {
-                    decoupledEngine = 
-                        JettyHTTPServerEngine.getForPort(bus, 
-                                                         decoupledURL.getProtocol(),
-                                                         decoupledURL.getPort());
-                }
-                DecoupledHandler decoupledHandler =
-                    (DecoupledHandler)decoupledEngine.getServant(decoupledURL);
-                if (decoupledHandler == null) {
-                    decoupledHandler = new DecoupledHandler();
-                    decoupledEngine.addServant(decoupledURL, decoupledHandler);
-                } 
-                decoupledHandler.duplicate();
+                decoupledDestination = getDestination(decoupledAddress);
+                duplicateDecoupledDestination();
             } catch (Exception e) {
                 // REVISIT move message to localizable Messages.properties
                 LOG.log(Level.WARNING, "decoupled endpoint creation failed: ", e);
             }
         }
-        return new DecoupledDestination(reference, incomingObserver);
     }
-    
- 
 
     /**
-     * Wrapper output stream responsible for flushing headers and handling
-     * the incoming HTTP-level response (not necessarily the MEP response).
+     * @param address the address
+     * @return a Destination for the address
      */
-    private class WrappedOutputStream extends AbstractWrappedOutputStream {
-        protected URLConnection connection;
-        
-        WrappedOutputStream(Message m, URLConnection c) {
-            super(m);
-            connection = c;
+    private Destination getDestination(String address) throws IOException {
+        Destination destination = null;
+        DestinationFactoryManager factoryManager =
+            bus.getExtension(DestinationFactoryManager.class);
+        DestinationFactory factory =
+            factoryManager.getDestinationFactoryForUri(address);
+        if (factory != null) {
+            EndpointInfo ei = new EndpointInfo();
+            ei.setAddress(address);
+            destination = factory.getDestination(ei);
+            decoupledObserver = new InterposedMessageObserver();
+            destination.setMessageObserver(decoupledObserver);
         }
-
-        /**
-         * Perform any actions required on stream flush (freeze headers,
-         * reset output stream ... etc.)
-         */
-        protected void doFlush() throws IOException {
-            if (!alreadyFlushed()) {                
-                flushHeaders(outMessage);
-                if (connection instanceof HttpURLConnection) {            
-                    HttpURLConnection hc = (HttpURLConnection)connection;                    
-                    if (hc.getRequestMethod().equals("GET")) {
-                        return;
-                    }
-                }
-                resetOut(connection.getOutputStream(), true);
-            }
-        }
-
-        /**
-         * Perform any actions required on stream closure (handle response etc.)
-         */
-        protected void doClose() throws IOException {
-            handleResponse();
-        }
-        
-        protected void onWrite() throws IOException {
-            
-        }
-
-        private void handleResponse() throws IOException {
-            Exchange exchange = outMessage.getExchange();
-            int responseCode = getResponseCode(connection);
-            if (isOneway(exchange)
-                && !isPartialResponse(connection, responseCode)) {
-                // oneway operation without partial response
-                connection.getInputStream().close();
-                return;
-            }
-            
-            Message inMessage = new MessageImpl();
-            inMessage.setExchange(exchange);
-            InputStream in = null;
-            Map<String, List<String>> headers = new HashMap<String, List<String>>();
-            for (String key : connection.getHeaderFields().keySet()) {
-                headers.put(HttpHeaderHelper.getHeaderKey(key), connection.getHeaderFields().get(key));
-            }
-            inMessage.put(Message.PROTOCOL_HEADERS, headers);
-            inMessage.put(Message.RESPONSE_CODE, responseCode);
-            inMessage.put(Message.CONTENT_TYPE, connection.getHeaderField(HttpHeaderHelper.CONTENT_TYPE));
-
-            if (connection instanceof HttpURLConnection) {
-                HttpURLConnection hc = (HttpURLConnection)connection;
-                in = hc.getErrorStream();
-                if (null == in) {
-                    in = connection.getInputStream();
-                }
-            } else {
-                in = connection.getInputStream();
-            }
-            
-            inMessage.setContent(InputStream.class, in);
-            
-            incomingObserver.onMessage(inMessage);
+        return destination;
+    }
+    
+    /**
+     * @return the decoupled observer
+     */
+    protected MessageObserver getDecoupledObserver() {
+        return decoupledObserver;
+    }
+    
+    private synchronized void duplicateDecoupledDestination() {
+        decoupledDestinationRefCount++;
+    }
+    
+    private synchronized void releaseDecoupledDestination() {
+        if (--decoupledDestinationRefCount == 0) {
+            LOG.log(Level.INFO, "shutting down decoupled destination");
+            decoupledDestination.shutdown();
         }
     }
     
@@ -489,115 +421,6 @@
                && connection.getContentLength() != 0;
     }
 
-    /**
-     * Wrapper output stream responsible for commiting incoming request 
-     * containing a decoupled response.
-     */
-    private class WrapperInputStream extends FilterInputStream {
-        HttpServletRequest request;
-        HttpServletResponse response;
-        boolean closed;
-        
-        WrapperInputStream(InputStream is,
-                           HttpServletRequest req,
-                           HttpServletResponse resp) {
-            super(is);
-            request = req;
-            response = resp;
-        }
-        
-        public void close() throws IOException {
-            if (!closed) {
-                closed = true;
-                response.flushBuffer(); 
-                Request baseRequest = (request instanceof Request) 
-                    ? (Request)request : HttpConnection.getCurrentConnection().getRequest();
-                baseRequest.setHandled(true);                
-            }
-        }
-    }
-    
-    /**
-     * Represented decoupled response endpoint.
-     */
-    protected class DecoupledDestination implements Destination {
-        protected MessageObserver decoupledMessageObserver;
-        private EndpointReferenceType address;
-        
-        DecoupledDestination(EndpointReferenceType ref,
-                             MessageObserver incomingObserver) {
-            address = ref;
-            decoupledMessageObserver = incomingObserver;
-        }
-
-        public EndpointReferenceType getAddress() {
-            return address;
-        }
-
-        public Conduit getBackChannel(Message inMessage,
-                                      Message partialResponse,
-                                      EndpointReferenceType addr)
-            throws IOException {
-            // shouldn't be called on decoupled endpoint
-            return null;
-        }
-
-        public void shutdown() {
-            // TODO Auto-generated method stub            
-        }
-
-        public synchronized void setMessageObserver(MessageObserver observer) {
-            decoupledMessageObserver = observer;
-        }
-        
-        protected synchronized MessageObserver getMessageObserver() {
-            return decoupledMessageObserver;
-        }
-    }
-
-    /**
-     * Handles incoming decoupled responses.
-     */
-    private class DecoupledHandler extends AbstractHandler {
-        private int refCount;
-                
-        synchronized void duplicate() {
-            refCount++;
-        }
-        
-        synchronized void release() {
-            if (--refCount == 0) {
-                decoupledEngine.removeServant(decoupledURL);
-                JettyHTTPServerEngine.destroyForPort(decoupledURL.getPort());
-            }
-        }
-        
-        public void handle(String targetURI,
-                           HttpServletRequest req,
-                           HttpServletResponse resp,
-                           int dispatch) throws IOException {
-            InputStream responseStream = req.getInputStream();
-            Message inMessage = new MessageImpl();
-            // disposable exchange, swapped with real Exchange on correlation
-            inMessage.setExchange(new ExchangeImpl());
-            inMessage.put(DECOUPLED_CHANNEL_MESSAGE, Boolean.TRUE);
-            // REVISIT: how to get response headers?
-            //inMessage.put(Message.PROTOCOL_HEADERS, req.getXXX());
-            setHeaders(inMessage);
-            inMessage.put(Message.ENCODING, resp.getCharacterEncoding());
-            inMessage.put(Message.CONTENT_TYPE, resp.getContentType());
-            inMessage.put(Message.RESPONSE_CODE, HttpURLConnection.HTTP_OK);
-            InputStream is = new WrapperInputStream(responseStream, req, resp);
-            inMessage.setContent(InputStream.class, is);
-
-            try {
-                decoupledDestination.getMessageObserver().onMessage(inMessage);    
-            } finally {
-                is.close();
-            }
-        }
-    }    
-
     private void initConfig() {
         config = new ConfigBean();
         // Initialize some default values for the configuration
@@ -722,5 +545,106 @@
             }
             return null;
         }        
+    }
+    
+    /**
+     * Wrapper output stream responsible for flushing headers and handling
+     * the incoming HTTP-level response (not necessarily the MEP response).
+     */
+    private class WrappedOutputStream extends AbstractWrappedOutputStream {
+        protected URLConnection connection;
+        
+        WrappedOutputStream(Message m, URLConnection c) {
+            super(m);
+            connection = c;
+        }
+
+        /**
+         * Perform any actions required on stream flush (freeze headers,
+         * reset output stream ... etc.)
+         */
+        protected void doFlush() throws IOException {
+            if (!alreadyFlushed()) {                
+                flushHeaders(outMessage);
+                if (connection instanceof HttpURLConnection) {            
+                    HttpURLConnection hc = (HttpURLConnection)connection;                    
+                    if (hc.getRequestMethod().equals("GET")) {
+                        return;
+                    }
+                }
+                resetOut(connection.getOutputStream(), true);
+            }
+        }
+
+        /**
+         * Perform any actions required on stream closure (handle response etc.)
+         */
+        protected void doClose() throws IOException {
+            handleResponse();
+        }
+        
+        protected void onWrite() throws IOException {
+            
+        }
+
+        private void handleResponse() throws IOException {
+            Exchange exchange = outMessage.getExchange();
+            int responseCode = getResponseCode(connection);
+            if (isOneway(exchange)
+                && !isPartialResponse(connection, responseCode)) {
+                // oneway operation without partial response
+                connection.getInputStream().close();
+                return;
+            }
+            
+            Message inMessage = new MessageImpl();
+            inMessage.setExchange(exchange);
+            InputStream in = null;
+            Map<String, List<String>> headers = new HashMap<String, List<String>>();
+            for (String key : connection.getHeaderFields().keySet()) {
+                headers.put(HttpHeaderHelper.getHeaderKey(key), connection.getHeaderFields().get(key));
+            }
+            inMessage.put(Message.PROTOCOL_HEADERS, headers);
+            inMessage.put(Message.RESPONSE_CODE, responseCode);
+            inMessage.put(Message.CONTENT_TYPE, connection.getHeaderField(HttpHeaderHelper.CONTENT_TYPE));
+
+            if (connection instanceof HttpURLConnection) {
+                HttpURLConnection hc = (HttpURLConnection)connection;
+                in = hc.getErrorStream();
+                if (null == in) {
+                    in = connection.getInputStream();
+                }
+            } else {
+                in = connection.getInputStream();
+            }
+            
+            inMessage.setContent(InputStream.class, in);
+            
+            incomingObserver.onMessage(inMessage);
+        }
+    }
+    
+    /**
+     * Used to set appropriate message properties, exchange etc.
+     * as required for an incoming decoupled response (as opposed
+     * what's normally set by the Destination for an incoming
+     * request).
+     */
+    protected class InterposedMessageObserver implements MessageObserver {
+        /**
+         * Called for an incoming message.
+         * 
+         * @param inMessage
+         */
+        public void onMessage(Message inMessage) {
+            // disposable exchange, swapped with real Exchange on correlation
+            inMessage.setExchange(new ExchangeImpl());
+            inMessage.put(DECOUPLED_CHANNEL_MESSAGE, Boolean.TRUE);
+            // REVISIT: how to get response headers?
+            //inMessage.put(Message.PROTOCOL_HEADERS, req.getXXX());
+            setHeaders(inMessage);
+            inMessage.put(Message.RESPONSE_CODE, HttpURLConnection.HTTP_OK);
+            incomingObserver.onMessage(inMessage);
+        }
     }
 }

Modified: incubator/cxf/trunk/rt/transports/http2/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http2/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java?view=diff&rev=504504&r1=504503&r2=504504
==============================================================================
--- incubator/cxf/trunk/rt/transports/http2/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java (original)
+++ incubator/cxf/trunk/rt/transports/http2/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java Wed Feb  7 03:19:20 2007
@@ -33,10 +33,8 @@
 import java.util.List;
 import java.util.Map;
 
-import javax.servlet.ServletException;
 import javax.servlet.ServletInputStream;
 import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpServletResponse;
 
 import junit.framework.TestCase;
 
@@ -45,17 +43,18 @@
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.transport.DestinationFactoryManager;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.http.conduit.HTTPConduitConfigBean;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.wsdl.EndpointReferenceUtils;
 import org.easymock.classextension.EasyMock;
 import org.easymock.classextension.IMocksControl;
-import org.mortbay.jetty.Handler;
-import org.mortbay.jetty.Request;
-import org.mortbay.jetty.handler.AbstractHandler;
+
+import static org.apache.cxf.message.Message.DECOUPLED_CHANNEL_MESSAGE;
+
 
 public class HTTPConduitTest extends TestCase {
     private static final String NOWHERE = "http://nada.nothing.nowhere.null/";
@@ -69,8 +68,6 @@
     private MessageObserver observer;
     private ServletOutputStream os;
     private ServletInputStream is;
-    private TestServerEngine decoupledEngine;
-    //private MultiMap parameters;
     private IMocksControl control;
     
     public void setUp() throws Exception {
@@ -89,8 +86,6 @@
         observer = null;
         os = null;
         is = null;
-        //parameters = null;
-        decoupledEngine = null;
     }
 
     public void testGetTarget() throws Exception {
@@ -205,23 +200,33 @@
                     EasyMock.expectLastCall();                    
                 }
             }
-            
-            if (decoupled) {
-                decoupledEngine = new TestServerEngine();
-                //parameters = control.createMock(MultiMap.class);
-            }            
-            
         }
                
+        CXFBusImpl bus = new CXFBusImpl();
+        URL decoupledURL = null;
+        if (decoupled) {
+            decoupledURL = new URL(NOWHERE + "response");
+            DestinationFactoryManager mgr =
+                control.createMock(DestinationFactoryManager.class);
+            DestinationFactory factory =
+                control.createMock(DestinationFactory.class);
+            Destination destination =
+                control.createMock(Destination.class);
+
+            bus.setExtension(mgr, DestinationFactoryManager.class);
+            mgr.getDestinationFactoryForUri(decoupledURL.toString());
+            EasyMock.expectLastCall().andReturn(factory);
+            factory.getDestination(EasyMock.isA(EndpointInfo.class));
+            EasyMock.expectLastCall().andReturn(destination);
+            destination.setMessageObserver(EasyMock.isA(HTTPConduit.InterposedMessageObserver.class));
+        }
         
         control.replay();
 
-        CXFBusImpl bus = new CXFBusImpl();
         HTTPConduit conduit = new HTTPConduit(bus, 
                                               endpointInfo,
                                               null,
-                                              connectionFactory,
-                                              decoupledEngine);
+                                              connectionFactory);
         conduit.retrieveConnectionFactory();
 
         HTTPConduitConfigBean config = conduit.getConfig();
@@ -235,16 +240,14 @@
                 } 
             }
         }
-
+        
         if (decoupled) {
-            URL decoupledURL = null;
-            if (decoupled) {
-                decoupledURL = new URL(NOWHERE + "response");
-                config.getClient().setDecoupledEndpoint(decoupledURL.toString());
-            } 
+            config.getClient().setDecoupledEndpoint(decoupledURL.toString());
+            assertNotNull("expected back channel", conduit.getBackChannel());
+        } else {
+            assertNull("unexpected back channel", conduit.getBackChannel());
         }
        
-
         observer = new MessageObserver() {
             public void onMessage(Message m) {
                 inMessage = m;
@@ -254,19 +257,19 @@
         return conduit;
     }
     
-    private void verifySentMessage(Conduit conduit, Message message)
+    private void verifySentMessage(HTTPConduit conduit, Message message)
         throws IOException {
         verifySentMessage(conduit, message, false);
     }
 
-    private void verifySentMessage(Conduit conduit,
+    private void verifySentMessage(HTTPConduit conduit,
                                    Message message,
                                    boolean expectHeaders)
         throws IOException {
         verifySentMessage(conduit, message, expectHeaders, false);
     }
     
-    private void verifySentMessage(Conduit conduit,
+    private void verifySentMessage(HTTPConduit conduit,
                                    Message message,
                                    boolean expectHeaders,
                                    boolean decoupled)
@@ -287,11 +290,6 @@
         EasyMock.expectLastCall().andReturn(os);
         os.write(PAYLOAD.getBytes(), 0, PAYLOAD.length());
         EasyMock.expectLastCall();
-
-        URL decoupledURL = null;
-        if (decoupled) {
-            decoupledURL = new URL(NOWHERE + "response");
-        } 
         
         os.flush();
         EasyMock.expectLastCall();
@@ -303,26 +301,7 @@
         verifyHandleResponse(decoupled);
         
         control.replay();
-        
-        Destination backChannel = null;
-        AbstractHandler decoupledHandler = null;
-        if (decoupled) {
-            decoupledEngine.verifyCallCounts(new int[]{0, 0, 0});
-            backChannel = conduit.getBackChannel();
-            assertNotNull("expected back channel", backChannel);
-            decoupledEngine.verifyCallCounts(new int[]{1, 0, 1});
-            decoupledHandler = decoupledEngine.servants.get(decoupledURL);
-            assertNotNull("expected servant registered", decoupledHandler);
-            MessageObserver decoupledObserver =
-                ((HTTPConduit.DecoupledDestination)backChannel).getMessageObserver();
-            assertSame("unexpected decoupled destination",
-                       observer,       
-                       decoupledObserver);
-        } else {
-            backChannel = conduit.getBackChannel();
-            assertNull("unexpected back channel", backChannel);
-        }
-        
+                
         wrappedOS.flush();
         wrappedOS.flush();
         wrappedOS.close();
@@ -341,13 +320,10 @@
         assertSame("unexpected content", is, inMessage.getContent(InputStream.class));
         
         if (decoupled) {
-            verifyDecoupledResponse(decoupledHandler);
+            verifyDecoupledResponse(conduit);
         }
         
         conduit.close();
-        if (decoupled) {
-            decoupledEngine.verifyCallCounts(new int[]{1, 1, 2});
-        }
         
         finalVerify();
     }
@@ -400,49 +376,19 @@
         EasyMock.expectLastCall().andReturn(is);
     }
     
-    private void verifyDecoupledResponse(AbstractHandler decoupledHandler)
+    private void verifyDecoupledResponse(HTTPConduit conduit)
         throws IOException {
-        inMessage = null;
-        is = EasyMock.createMock(ServletInputStream.class);
-        os = EasyMock.createMock(ServletOutputStream.class);
-        Request decoupledRequest = EasyMock.createMock(Request.class);
-        decoupledRequest.getInputStream();
-        EasyMock.expectLastCall().andReturn(is);
-        decoupledRequest.setHandled(true);
-        EasyMock.replay(decoupledRequest);
-        
-        HttpServletResponse decoupledResponse = EasyMock.createMock(HttpServletResponse.class);
-        decoupledResponse.getCharacterEncoding();
-        EasyMock.expectLastCall().andReturn("utf8");
-        decoupledResponse.getContentType();
-        EasyMock.expectLastCall().andReturn("test");
-        decoupledResponse.flushBuffer();
-        EasyMock.expectLastCall();
-        EasyMock.replay(decoupledResponse);
-       
-        try {
-            decoupledHandler.handle("pathInContext",                                
-                                    decoupledRequest,
-                                    decoupledResponse, Handler.REQUEST);
-        } catch (ServletException e) {
-            fail("There should not throw the serletException");
-        }
-        assertNotNull("expected decoupled in message", inMessage);
-        assertNotNull("expected response headers",
-                      inMessage.get(Message.PROTOCOL_HEADERS));
+        Message incoming = new MessageImpl();
+        conduit.getDecoupledObserver().onMessage(incoming);
+        assertSame("expected pass thru onMessage() notification",
+                   inMessage,
+                   incoming);
         assertEquals("unexpected response code",
                      HttpURLConnection.HTTP_OK,
                      inMessage.get(Message.RESPONSE_CODE));
-
-        assertTrue("unexpected content formats",
-                   inMessage.getContentFormats().contains(InputStream.class));
-        InputStream decoupledIS = inMessage.getContent(InputStream.class);
-        assertNotNull("unexpected content", decoupledIS);
-        
-        decoupledIS.close();
-        
-        inMessage.setContent(InputStream.class, is);
-
+        assertEquals("expected DECOUPLED_CHANNEL_MESSAGE flag set",
+                     Boolean.TRUE,
+                     inMessage.get(DECOUPLED_CHANNEL_MESSAGE));
     }
 
     private void finalVerify() {
@@ -454,43 +400,5 @@
     
     static EndpointReferenceType getEPR(String s) {
         return EndpointReferenceUtils.getEndpointReference(NOWHERE + s);
-    }
-    
-    /**
-     * EasyMock does not seem able to properly mock calls to ServerEngine -
-     * expectations set seem to be ignored.
-     */
-    private class TestServerEngine implements ServerEngine {
-        private int callCounts[] = {0, 0, 0};
-        private Map<URL, AbstractHandler> servants =
-            new HashMap<URL, AbstractHandler>();
-        
-        public void addServant(URL url, AbstractHandler handler) {
-            callCounts[0]++;
-            servants.put(url, handler);
-        }
-
-        public void removeServant(URL url) {
-            callCounts[1]++;
-            servants.remove(url);
-        }
-
-        public Handler getServant(URL url) {
-            callCounts[2]++;
-            return servants.get(url);
-        }
-
-        void verifyCallCounts(int expectedCallCounts[]) {
-            assertEquals("unexpected addServant call count",
-                         expectedCallCounts[0],
-                         callCounts[0]);
-            assertEquals("unexpected removeServant call count",
-                         expectedCallCounts[1],
-                         callCounts[1]);
-            assertEquals("unexpected getServant call count",
-                         expectedCallCounts[2],
-                         callCounts[2]);
-        }
-        
     }
 }