You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by eg...@apache.org on 2007/02/07 12:19:21 UTC
svn commit: r504504 - in /incubator/cxf/trunk/rt/transports:
http/src/main/java/org/apache/cxf/transport/http/
http/src/test/java/org/apache/cxf/transport/http/
http2/src/main/java/org/apache/cxf/transport/http/
http2/src/test/java/org/apache/cxf/trans...
Author: eglynn
Date: Wed Feb 7 03:19:20 2007
New Revision: 504504
URL: http://svn.apache.org/viewvc?view=rev&rev=504504
Log:
Handle incoming decoupled response via an interposed MessageObserver on a HTTP Destination, so as to remove HTTPConduit dependency on Jetty.
Modified:
incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
incubator/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
incubator/cxf/trunk/rt/transports/http2/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
Modified: incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?view=diff&rev=504504&r1=504503&r2=504504
==============================================================================
--- incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original)
+++ incubator/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Wed Feb 7 03:19:20 2007
@@ -19,7 +19,6 @@
package org.apache.cxf.transport.http;
-import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -51,16 +50,14 @@
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.AbstractConduit;
-import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.transport.DestinationFactoryManager;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
import org.apache.cxf.ws.addressing.AttributedURIType;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
-import org.mortbay.http.HttpRequest;
-import org.mortbay.http.HttpResponse;
-import org.mortbay.http.handler.AbstractHttpHandler;
import static org.apache.cxf.message.Message.DECOUPLED_CHANNEL_MESSAGE;
@@ -77,9 +74,9 @@
private URLConnectionFactory connectionFactory;
private URL url;
- private ServerEngine decoupledEngine;
- private URL decoupledURL;
- private DecoupledDestination decoupledDestination;
+ private Destination decoupledDestination;
+ private MessageObserver decoupledObserver;
+ private int decoupledDestinationRefCount;
private EndpointInfo endpointInfo;
// COnfiguration values
@@ -114,7 +111,6 @@
this(b,
ei,
t,
- null,
null);
}
@@ -126,14 +122,12 @@
* @param ei the endpoint info of the initiator
* @param t the endpoint reference of the target
* @param factory the URL connection factory
- * @param eng the decoupled engine
* @throws IOException
*/
public HTTPConduit(Bus b,
EndpointInfo ei,
EndpointReferenceType t,
- URLConnectionFactory factory,
- ServerEngine eng) throws IOException {
+ URLConnectionFactory factory) throws IOException {
super(getTargetReference(ei, t));
bus = b;
endpointInfo = ei;
@@ -141,7 +135,6 @@
initConfig();
- decoupledEngine = eng;
url = t == null
? new URL(endpointInfo.getAddress())
: new URL(t.getAddress().getValue());
@@ -231,7 +224,7 @@
public synchronized Destination getBackChannel() {
if (decoupledDestination == null
&& getClient().getDecoupledEndpoint() != null) {
- decoupledDestination = setUpDecoupledDestination();
+ setUpDecoupledDestination();
}
return decoupledDestination;
}
@@ -255,12 +248,8 @@
// in decoupled case, close response Destination if reference count
// hits zero
//
- if (decoupledURL != null && decoupledEngine != null) {
- DecoupledHandler decoupledHandler =
- (DecoupledHandler)decoupledEngine.getServant(decoupledURL);
- if (decoupledHandler != null) {
- decoupledHandler.release();
- }
+ if (decoupledDestination != null) {
+ releaseDecoupledDestination();
}
}
@@ -358,10 +347,8 @@
/**
* Set up the decoupled Destination if necessary.
- *
- * @return an appropriate decoupled Destination
*/
- private DecoupledDestination setUpDecoupledDestination() {
+ private void setUpDecoupledDestination() {
EndpointReferenceType reference =
EndpointReferenceUtils.getEndpointReference(
getClient().getDecoupledEndpoint());
@@ -369,102 +356,50 @@
String decoupledAddress = reference.getAddress().getValue();
LOG.info("creating decoupled endpoint: " + decoupledAddress);
try {
- decoupledURL = new URL(decoupledAddress);
- if (decoupledEngine == null) {
- decoupledEngine =
- JettyHTTPServerEngine.getForPort(bus,
- decoupledURL.getProtocol(),
- decoupledURL.getPort());
- }
- DecoupledHandler decoupledHandler =
- (DecoupledHandler)decoupledEngine.getServant(decoupledURL);
- if (decoupledHandler == null) {
- decoupledHandler = new DecoupledHandler();
- decoupledEngine.addServant(decoupledURL, decoupledHandler);
- }
- decoupledHandler.duplicate();
+ decoupledDestination = getDestination(decoupledAddress);
+ duplicateDecoupledDestination();
} catch (Exception e) {
// REVISIT move message to localizable Messages.properties
LOG.log(Level.WARNING, "decoupled endpoint creation failed: ", e);
}
}
- return new DecoupledDestination(reference, incomingObserver);
}
-
+
/**
- * Wrapper output stream responsible for flushing headers and handling
- * the incoming HTTP-level response (not necessarily the MEP response).
+ * @param address the address
+ * @return a Destination for the address
*/
- private class WrappedOutputStream extends AbstractWrappedOutputStream {
- protected URLConnection connection;
-
- WrappedOutputStream(Message m, URLConnection c) {
- super(m);
- connection = c;
- }
-
- /**
- * Perform any actions required on stream flush (freeze headers,
- * reset output stream ... etc.)
- */
- protected void doFlush() throws IOException {
- if (!alreadyFlushed()) {
- flushHeaders(outMessage);
- if (connection instanceof HttpURLConnection) {
- HttpURLConnection hc = (HttpURLConnection)connection;
- if (hc.getRequestMethod().equals("GET")) {
- return;
- }
- }
- resetOut(connection.getOutputStream(), true);
- }
- }
-
- /**
- * Perform any actions required on stream closure (handle response etc.)
- */
- protected void doClose() throws IOException {
- handleResponse();
+ private Destination getDestination(String address) throws IOException {
+ Destination destination = null;
+ DestinationFactoryManager factoryManager =
+ bus.getExtension(DestinationFactoryManager.class);
+ DestinationFactory factory =
+ factoryManager.getDestinationFactoryForUri(address);
+ if (factory != null) {
+ EndpointInfo ei = new EndpointInfo();
+ ei.setAddress(address);
+ destination = factory.getDestination(ei);
+ decoupledObserver = new InterposedMessageObserver();
+ destination.setMessageObserver(decoupledObserver);
}
-
- protected void onWrite() throws IOException {
-
- }
-
- private void handleResponse() throws IOException {
- Exchange exchange = outMessage.getExchange();
- int responseCode = getResponseCode(connection);
- if (isOneway(exchange)
- && !isPartialResponse(connection, responseCode)) {
- // oneway operation without partial response
- connection.getInputStream().close();
- return;
- }
-
- Message inMessage = new MessageImpl();
- inMessage.setExchange(exchange);
- InputStream in = null;
- Map<String, List<String>> headers = new HashMap<String, List<String>>();
- for (String key : connection.getHeaderFields().keySet()) {
- headers.put(HttpHeaderHelper.getHeaderKey(key), connection.getHeaderFields().get(key));
- }
- inMessage.put(Message.PROTOCOL_HEADERS, headers);
- inMessage.put(Message.RESPONSE_CODE, responseCode);
- inMessage.put(Message.CONTENT_TYPE, connection.getHeaderField(HttpHeaderHelper.CONTENT_TYPE));
-
- if (connection instanceof HttpURLConnection) {
- HttpURLConnection hc = (HttpURLConnection)connection;
- in = hc.getErrorStream();
- if (null == in) {
- in = connection.getInputStream();
- }
- } else {
- in = connection.getInputStream();
- }
-
- inMessage.setContent(InputStream.class, in);
-
- incomingObserver.onMessage(inMessage);
+ return destination;
+ }
+
+ /**
+ * @return the decoupled observer
+ */
+ protected MessageObserver getDecoupledObserver() {
+ return decoupledObserver;
+ }
+
+ private synchronized void duplicateDecoupledDestination() {
+ decoupledDestinationRefCount++;
+ }
+
+ private synchronized void releaseDecoupledDestination() {
+ if (--decoupledDestinationRefCount == 0) {
+ LOG.log(Level.INFO, "shutting down decoupled destination");
+ decoupledDestination.shutdown();
}
}
@@ -487,113 +422,6 @@
&& connection.getContentLength() != 0;
}
- /**
- * Wrapper output stream responsible for commiting incoming request
- * containing a decoupled response.
- */
- private class WrapperInputStream extends FilterInputStream {
- HttpRequest request;
- HttpResponse response;
- boolean closed;
-
- WrapperInputStream(InputStream is,
- HttpRequest req,
- HttpResponse resp) {
- super(is);
- request = req;
- response = resp;
- }
-
- public void close() throws IOException {
- if (!closed) {
- closed = true;
- response.commit();
- request.setHandled(true);
- }
- }
- }
-
- /**
- * Represented decoupled response endpoint.
- */
- protected class DecoupledDestination implements Destination {
- protected MessageObserver decoupledMessageObserver;
- private EndpointReferenceType address;
-
- DecoupledDestination(EndpointReferenceType ref,
- MessageObserver incomingObserver) {
- address = ref;
- decoupledMessageObserver = incomingObserver;
- }
-
- public EndpointReferenceType getAddress() {
- return address;
- }
-
- public Conduit getBackChannel(Message inMessage,
- Message partialResponse,
- EndpointReferenceType addr)
- throws IOException {
- // shouldn't be called on decoupled endpoint
- return null;
- }
-
- public void shutdown() {
- // TODO Auto-generated method stub
- }
-
- public synchronized void setMessageObserver(MessageObserver observer) {
- decoupledMessageObserver = observer;
- }
-
- protected synchronized MessageObserver getMessageObserver() {
- return decoupledMessageObserver;
- }
- }
-
- /**
- * Handles incoming decoupled responses.
- */
- private class DecoupledHandler extends AbstractHttpHandler {
- private int refCount;
-
- synchronized void duplicate() {
- refCount++;
- }
-
- synchronized void release() {
- if (--refCount == 0) {
- decoupledEngine.removeServant(decoupledURL);
- JettyHTTPServerEngine.destroyForPort(decoupledURL.getPort());
- }
- }
-
- public void handle(String pathInContext,
- String pathParams,
- HttpRequest req,
- HttpResponse resp) throws IOException {
- InputStream responseStream = req.getInputStream();
- Message inMessage = new MessageImpl();
- // disposable exchange, swapped with real Exchange on correlation
- inMessage.setExchange(new ExchangeImpl());
- inMessage.put(DECOUPLED_CHANNEL_MESSAGE, Boolean.TRUE);
- // REVISIT: how to get response headers?
- //inMessage.put(Message.PROTOCOL_HEADERS, req.getXXX());
- setHeaders(inMessage);
- inMessage.put(Message.ENCODING, resp.getCharacterEncoding());
- inMessage.put(Message.CONTENT_TYPE, resp.getContentType());
- inMessage.put(Message.RESPONSE_CODE, HttpURLConnection.HTTP_OK);
- InputStream is = new WrapperInputStream(responseStream, req, resp);
- inMessage.setContent(InputStream.class, is);
-
- try {
- decoupledDestination.getMessageObserver().onMessage(inMessage);
- } finally {
- is.close();
- }
- }
- }
-
private void initConfig() {
// Initialize some default values for the configuration
client = endpointInfo.getTraversedExtensor(new HTTPClientPolicy(), HTTPClientPolicy.class);
@@ -749,5 +577,104 @@
this.sslClient = sslClient;
}
-
+ /**
+ * Wrapper output stream responsible for flushing headers and handling
+ * the incoming HTTP-level response (not necessarily the MEP response).
+ */
+ private class WrappedOutputStream extends AbstractWrappedOutputStream {
+ protected URLConnection connection;
+
+ WrappedOutputStream(Message m, URLConnection c) {
+ super(m);
+ connection = c;
+ }
+
+ /**
+ * Perform any actions required on stream flush (freeze headers,
+ * reset output stream ... etc.)
+ */
+ protected void doFlush() throws IOException {
+ if (!alreadyFlushed()) {
+ flushHeaders(outMessage);
+ if (connection instanceof HttpURLConnection) {
+ HttpURLConnection hc = (HttpURLConnection)connection;
+ if (hc.getRequestMethod().equals("GET")) {
+ return;
+ }
+ }
+ resetOut(connection.getOutputStream(), true);
+ }
+ }
+
+ /**
+ * Perform any actions required on stream closure (handle response etc.)
+ */
+ protected void doClose() throws IOException {
+ handleResponse();
+ }
+
+ protected void onWrite() throws IOException {
+
+ }
+
+ private void handleResponse() throws IOException {
+ Exchange exchange = outMessage.getExchange();
+ int responseCode = getResponseCode(connection);
+ if (isOneway(exchange)
+ && !isPartialResponse(connection, responseCode)) {
+ // oneway operation without partial response
+ connection.getInputStream().close();
+ return;
+ }
+
+ Message inMessage = new MessageImpl();
+ inMessage.setExchange(exchange);
+ InputStream in = null;
+ Map<String, List<String>> headers = new HashMap<String, List<String>>();
+ for (String key : connection.getHeaderFields().keySet()) {
+ headers.put(HttpHeaderHelper.getHeaderKey(key), connection.getHeaderFields().get(key));
+ }
+ inMessage.put(Message.PROTOCOL_HEADERS, headers);
+ inMessage.put(Message.RESPONSE_CODE, responseCode);
+ inMessage.put(Message.CONTENT_TYPE, connection.getHeaderField(HttpHeaderHelper.CONTENT_TYPE));
+
+ if (connection instanceof HttpURLConnection) {
+ HttpURLConnection hc = (HttpURLConnection)connection;
+ in = hc.getErrorStream();
+ if (null == in) {
+ in = connection.getInputStream();
+ }
+ } else {
+ in = connection.getInputStream();
+ }
+
+ inMessage.setContent(InputStream.class, in);
+
+ incomingObserver.onMessage(inMessage);
+ }
+ }
+
+ /**
+ * Used to set appropriate message properties, exchange etc.
+ * as required for an incoming decoupled response (as opposed
+ * what's normally set by the Destination for an incoming
+ * request).
+ */
+ protected class InterposedMessageObserver implements MessageObserver {
+ /**
+ * Called for an incoming message.
+ *
+ * @param inMessage
+ */
+ public void onMessage(Message inMessage) {
+ // disposable exchange, swapped with real Exchange on correlation
+ inMessage.setExchange(new ExchangeImpl());
+ inMessage.put(DECOUPLED_CHANNEL_MESSAGE, Boolean.TRUE);
+ // REVISIT: how to get response headers?
+ //inMessage.put(Message.PROTOCOL_HEADERS, req.getXXX());
+ setHeaders(inMessage);
+ inMessage.put(Message.RESPONSE_CODE, HttpURLConnection.HTTP_OK);
+ incomingObserver.onMessage(inMessage);
+ }
+ }
}
Modified: incubator/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java?view=diff&rev=504504&r1=504503&r2=504504
==============================================================================
--- incubator/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java (original)
+++ incubator/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java Wed Feb 7 03:19:20 2007
@@ -40,16 +40,17 @@
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.transport.DestinationFactoryManager;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
import org.easymock.classextension.EasyMock;
import org.easymock.classextension.IMocksControl;
-import org.mortbay.http.HttpHandler;
-import org.mortbay.http.handler.AbstractHttpHandler;
-import org.mortbay.util.MultiMap;
+
+import static org.apache.cxf.message.Message.DECOUPLED_CHANNEL_MESSAGE;
+
public class HTTPConduitTest extends TestCase {
private static final String NOWHERE = "http://nada.nothing.nowhere.null/";
@@ -63,8 +64,6 @@
private MessageObserver observer;
private OutputStream os;
private InputStream is;
- private TestServerEngine decoupledEngine;
- private MultiMap parameters;
private IMocksControl control;
public void setUp() throws Exception {
@@ -83,8 +82,6 @@
observer = null;
os = null;
is = null;
- parameters = null;
- decoupledEngine = null;
}
public void testGetTarget() throws Exception {
@@ -198,24 +195,34 @@
((HttpURLConnection)connection).setChunkedStreamingMode(2048);
EasyMock.expectLastCall();
}
- }
-
- if (decoupled) {
- decoupledEngine = new TestServerEngine();
- parameters = control.createMock(MultiMap.class);
- }
-
+ }
+ }
+
+ CXFBusImpl bus = new CXFBusImpl();
+ URL decoupledURL = null;
+ if (decoupled) {
+ decoupledURL = new URL(NOWHERE + "response");
+ DestinationFactoryManager mgr =
+ control.createMock(DestinationFactoryManager.class);
+ DestinationFactory factory =
+ control.createMock(DestinationFactory.class);
+ Destination destination =
+ control.createMock(Destination.class);
+
+ bus.setExtension(mgr, DestinationFactoryManager.class);
+ mgr.getDestinationFactoryForUri(decoupledURL.toString());
+ EasyMock.expectLastCall().andReturn(factory);
+ factory.getDestination(EasyMock.isA(EndpointInfo.class));
+ EasyMock.expectLastCall().andReturn(destination);
+ destination.setMessageObserver(EasyMock.isA(HTTPConduit.InterposedMessageObserver.class));
}
-
control.replay();
- CXFBusImpl bus = new CXFBusImpl();
HTTPConduit conduit = new HTTPConduit(bus,
endpointInfo,
null,
- connectionFactory,
- decoupledEngine);
+ connectionFactory);
conduit.retrieveConnectionFactory();
if (send) {
@@ -230,13 +237,11 @@
}
if (decoupled) {
- URL decoupledURL = null;
- if (decoupled) {
- decoupledURL = new URL(NOWHERE + "response");
- conduit.getClient().setDecoupledEndpoint(decoupledURL.toString());
- }
+ conduit.getClient().setDecoupledEndpoint(decoupledURL.toString());
+ assertNotNull("expected back channel", conduit.getBackChannel());
+ } else {
+ assertNull("unexpected back channel", conduit.getBackChannel());
}
-
observer = new MessageObserver() {
public void onMessage(Message m) {
@@ -247,19 +252,19 @@
return conduit;
}
- private void verifySentMessage(Conduit conduit, Message message)
+ private void verifySentMessage(HTTPConduit conduit, Message message)
throws IOException {
verifySentMessage(conduit, message, false);
}
- private void verifySentMessage(Conduit conduit,
+ private void verifySentMessage(HTTPConduit conduit,
Message message,
boolean expectHeaders)
throws IOException {
verifySentMessage(conduit, message, expectHeaders, false);
}
- private void verifySentMessage(Conduit conduit,
+ private void verifySentMessage(HTTPConduit conduit,
Message message,
boolean expectHeaders,
boolean decoupled)
@@ -280,11 +285,6 @@
EasyMock.expectLastCall().andReturn(os);
os.write(PAYLOAD.getBytes(), 0, PAYLOAD.length());
EasyMock.expectLastCall();
-
- URL decoupledURL = null;
- if (decoupled) {
- decoupledURL = new URL(NOWHERE + "response");
- }
os.flush();
EasyMock.expectLastCall();
@@ -294,28 +294,9 @@
EasyMock.expectLastCall();
verifyHandleResponse(decoupled);
-
+
control.replay();
- Destination backChannel = null;
- AbstractHttpHandler decoupledHandler = null;
- if (decoupled) {
- decoupledEngine.verifyCallCounts(new int[]{0, 0, 0});
- backChannel = conduit.getBackChannel();
- assertNotNull("expected back channel", backChannel);
- decoupledEngine.verifyCallCounts(new int[]{1, 0, 1});
- decoupledHandler = decoupledEngine.servants.get(decoupledURL);
- assertNotNull("expected servant registered", decoupledHandler);
- MessageObserver decoupledObserver =
- ((HTTPConduit.DecoupledDestination)backChannel).getMessageObserver();
- assertSame("unexpected decoupled destination",
- observer,
- decoupledObserver);
- } else {
- backChannel = conduit.getBackChannel();
- assertNull("unexpected back channel", backChannel);
- }
-
wrappedOS.flush();
wrappedOS.flush();
wrappedOS.close();
@@ -334,13 +315,10 @@
assertSame("unexpected content", is, inMessage.getContent(InputStream.class));
if (decoupled) {
- verifyDecoupledResponse(decoupledHandler);
+ verifyDecoupledResponse(conduit);
}
conduit.close();
- if (decoupled) {
- decoupledEngine.verifyCallCounts(new int[]{1, 1, 2});
- }
finalVerify();
}
@@ -393,45 +371,19 @@
EasyMock.expectLastCall().andReturn(is);
}
- private void verifyDecoupledResponse(AbstractHttpHandler decoupledHandler)
+ private void verifyDecoupledResponse(HTTPConduit conduit)
throws IOException {
- inMessage = null;
- is = EasyMock.createMock(InputStream.class);
- os = EasyMock.createMock(OutputStream.class);
- TestHttpRequest decoupledRequest = new TestHttpRequest(is, parameters);
- TestHttpResponse decoupledResponse = new TestHttpResponse(os);
- decoupledHandler.handle("pathInContext",
- "pathParams",
- decoupledRequest,
- decoupledResponse);
- assertNotNull("expected decoupled in message", inMessage);
- assertNotNull("expected response headers",
- inMessage.get(Message.PROTOCOL_HEADERS));
+ Message incoming = new MessageImpl();
+ conduit.getDecoupledObserver().onMessage(incoming);
+ assertSame("expected pass thru onMessage() notification",
+ inMessage,
+ incoming);
assertEquals("unexpected response code",
HttpURLConnection.HTTP_OK,
inMessage.get(Message.RESPONSE_CODE));
-
- assertEquals("unexpected getInputStream count",
- 1,
- decoupledRequest.getInputStreamCallCount());
- //assertEquals("unexpected getParameters counts",
- // 1,
- // decoupledRequest.getParametersCallCount());
- assertTrue("unexpected content formats",
- inMessage.getContentFormats().contains(InputStream.class));
- InputStream decoupledIS = inMessage.getContent(InputStream.class);
- assertNotNull("unexpected content", decoupledIS);
-
- decoupledIS.close();
- assertEquals("unexpected setHandled count",
- 1,
- decoupledRequest.getHandledCallCount());
- assertEquals("unexpected setHandled count",
- 1,
- decoupledResponse.getCommitCallCount());
-
- inMessage.setContent(InputStream.class, is);
-
+ assertEquals("expected DECOUPLED_CHANNEL_MESSAGE flag set",
+ Boolean.TRUE,
+ inMessage.get(DECOUPLED_CHANNEL_MESSAGE));
}
private void finalVerify() {
@@ -443,42 +395,5 @@
static EndpointReferenceType getEPR(String s) {
return EndpointReferenceUtils.getEndpointReference(NOWHERE + s);
- }
-
- /**
- * EasyMock does not seem able to properly mock calls to ServerEngine -
- * expectations set seem to be ignored.
- */
- private class TestServerEngine implements ServerEngine {
- private int callCounts[] = {0, 0, 0};
- private Map<URL, AbstractHttpHandler> servants =
- new HashMap<URL, AbstractHttpHandler>();
-
- public void addServant(URL url, AbstractHttpHandler handler) {
- callCounts[0]++;
- servants.put(url, handler);
- }
-
- public void removeServant(URL url) {
- callCounts[1]++;
- servants.remove(url);
- }
-
- public HttpHandler getServant(URL url) {
- callCounts[2]++;
- return servants.get(url);
- }
-
- void verifyCallCounts(int expectedCallCounts[]) {
- assertEquals("unexpected addServant call count",
- expectedCallCounts[0],
- callCounts[0]);
- assertEquals("unexpected removeServant call count",
- expectedCallCounts[1],
- callCounts[1]);
- assertEquals("unexpected getServant call count",
- expectedCallCounts[2],
- callCounts[2]);
- }
}
}
Modified: incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?view=diff&rev=504504&r1=504503&r2=504504
==============================================================================
--- incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original)
+++ incubator/cxf/trunk/rt/transports/http2/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Wed Feb 7 03:19:20 2007
@@ -19,7 +19,6 @@
package org.apache.cxf.transport.http;
-import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -36,9 +35,6 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
import org.apache.cxf.Bus;
import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.common.util.Base64Utility;
@@ -53,19 +49,19 @@
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.AbstractConduit;
-import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.transport.DestinationFactoryManager;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.transport.http.conduit.HTTPConduitConfigBean;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
import org.apache.cxf.ws.addressing.AttributedURIType;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
-import org.mortbay.jetty.HttpConnection;
-import org.mortbay.jetty.Request;
-import org.mortbay.jetty.handler.AbstractHandler;
import static org.apache.cxf.message.Message.DECOUPLED_CHANNEL_MESSAGE;
+
+
/**
* HTTP Conduit implementation.
*/
@@ -80,9 +76,9 @@
private URLConnectionFactory connectionFactory;
private URL url;
- private ServerEngine decoupledEngine;
- private URL decoupledURL;
- private DecoupledDestination decoupledDestination;
+ private Destination decoupledDestination;
+ private MessageObserver decoupledObserver;
+ private int decoupledDestinationRefCount;
private EndpointInfo endpointInfo;
@@ -111,7 +107,6 @@
this(b,
ei,
t,
- null,
null);
}
@@ -123,14 +118,12 @@
* @param ei the endpoint info of the initiator
* @param t the endpoint reference of the target
* @param factory the URL connection factory
- * @param eng the decoupled engine
* @throws IOException
*/
public HTTPConduit(Bus b,
EndpointInfo ei,
EndpointReferenceType t,
- URLConnectionFactory factory,
- ServerEngine eng) throws IOException {
+ URLConnectionFactory factory) throws IOException {
super(getTargetReference(ei, t));
bus = b;
endpointInfo = ei;
@@ -138,7 +131,6 @@
initConfig();
- decoupledEngine = eng;
url = t == null
? new URL(endpointInfo.getAddress())
: new URL(t.getAddress().getValue());
@@ -231,7 +223,7 @@
public synchronized Destination getBackChannel() {
if (decoupledDestination == null
&& config.getClient().getDecoupledEndpoint() != null) {
- decoupledDestination = setUpDecoupledDestination();
+ setUpDecoupledDestination();
}
return decoupledDestination;
}
@@ -255,12 +247,8 @@
// in decoupled case, close response Destination if reference count
// hits zero
//
- if (decoupledURL != null && decoupledEngine != null) {
- DecoupledHandler decoupledHandler =
- (DecoupledHandler)decoupledEngine.getServant(decoupledURL);
- if (decoupledHandler != null) {
- decoupledHandler.release();
- }
+ if (decoupledDestination != null) {
+ releaseDecoupledDestination();
}
}
@@ -358,10 +346,8 @@
/**
* Set up the decoupled Destination if necessary.
- *
- * @return an appropriate decoupled Destination
*/
- private DecoupledDestination setUpDecoupledDestination() {
+ private void setUpDecoupledDestination() {
EndpointReferenceType reference =
EndpointReferenceUtils.getEndpointReference(
config.getClient().getDecoupledEndpoint());
@@ -369,104 +355,50 @@
String decoupledAddress = reference.getAddress().getValue();
LOG.info("creating decoupled endpoint: " + decoupledAddress);
try {
- decoupledURL = new URL(decoupledAddress);
- if (decoupledEngine == null) {
- decoupledEngine =
- JettyHTTPServerEngine.getForPort(bus,
- decoupledURL.getProtocol(),
- decoupledURL.getPort());
- }
- DecoupledHandler decoupledHandler =
- (DecoupledHandler)decoupledEngine.getServant(decoupledURL);
- if (decoupledHandler == null) {
- decoupledHandler = new DecoupledHandler();
- decoupledEngine.addServant(decoupledURL, decoupledHandler);
- }
- decoupledHandler.duplicate();
+ decoupledDestination = getDestination(decoupledAddress);
+ duplicateDecoupledDestination();
} catch (Exception e) {
// REVISIT move message to localizable Messages.properties
LOG.log(Level.WARNING, "decoupled endpoint creation failed: ", e);
}
}
- return new DecoupledDestination(reference, incomingObserver);
}
-
-
/**
- * Wrapper output stream responsible for flushing headers and handling
- * the incoming HTTP-level response (not necessarily the MEP response).
+ * @param address the address
+ * @return a Destination for the address
*/
- private class WrappedOutputStream extends AbstractWrappedOutputStream {
- protected URLConnection connection;
-
- WrappedOutputStream(Message m, URLConnection c) {
- super(m);
- connection = c;
+ private Destination getDestination(String address) throws IOException {
+ Destination destination = null;
+ DestinationFactoryManager factoryManager =
+ bus.getExtension(DestinationFactoryManager.class);
+ DestinationFactory factory =
+ factoryManager.getDestinationFactoryForUri(address);
+ if (factory != null) {
+ EndpointInfo ei = new EndpointInfo();
+ ei.setAddress(address);
+ destination = factory.getDestination(ei);
+ decoupledObserver = new InterposedMessageObserver();
+ destination.setMessageObserver(decoupledObserver);
}
-
- /**
- * Perform any actions required on stream flush (freeze headers,
- * reset output stream ... etc.)
- */
- protected void doFlush() throws IOException {
- if (!alreadyFlushed()) {
- flushHeaders(outMessage);
- if (connection instanceof HttpURLConnection) {
- HttpURLConnection hc = (HttpURLConnection)connection;
- if (hc.getRequestMethod().equals("GET")) {
- return;
- }
- }
- resetOut(connection.getOutputStream(), true);
- }
- }
-
- /**
- * Perform any actions required on stream closure (handle response etc.)
- */
- protected void doClose() throws IOException {
- handleResponse();
- }
-
- protected void onWrite() throws IOException {
-
- }
-
- private void handleResponse() throws IOException {
- Exchange exchange = outMessage.getExchange();
- int responseCode = getResponseCode(connection);
- if (isOneway(exchange)
- && !isPartialResponse(connection, responseCode)) {
- // oneway operation without partial response
- connection.getInputStream().close();
- return;
- }
-
- Message inMessage = new MessageImpl();
- inMessage.setExchange(exchange);
- InputStream in = null;
- Map<String, List<String>> headers = new HashMap<String, List<String>>();
- for (String key : connection.getHeaderFields().keySet()) {
- headers.put(HttpHeaderHelper.getHeaderKey(key), connection.getHeaderFields().get(key));
- }
- inMessage.put(Message.PROTOCOL_HEADERS, headers);
- inMessage.put(Message.RESPONSE_CODE, responseCode);
- inMessage.put(Message.CONTENT_TYPE, connection.getHeaderField(HttpHeaderHelper.CONTENT_TYPE));
-
- if (connection instanceof HttpURLConnection) {
- HttpURLConnection hc = (HttpURLConnection)connection;
- in = hc.getErrorStream();
- if (null == in) {
- in = connection.getInputStream();
- }
- } else {
- in = connection.getInputStream();
- }
-
- inMessage.setContent(InputStream.class, in);
-
- incomingObserver.onMessage(inMessage);
+ return destination;
+ }
+
+ /**
+ * @return the decoupled observer
+ */
+ protected MessageObserver getDecoupledObserver() {
+ return decoupledObserver;
+ }
+
+ private synchronized void duplicateDecoupledDestination() {
+ decoupledDestinationRefCount++;
+ }
+
+ private synchronized void releaseDecoupledDestination() {
+ if (--decoupledDestinationRefCount == 0) {
+ LOG.log(Level.INFO, "shutting down decoupled destination");
+ decoupledDestination.shutdown();
}
}
@@ -489,115 +421,6 @@
&& connection.getContentLength() != 0;
}
- /**
- * Wrapper output stream responsible for commiting incoming request
- * containing a decoupled response.
- */
- private class WrapperInputStream extends FilterInputStream {
- HttpServletRequest request;
- HttpServletResponse response;
- boolean closed;
-
- WrapperInputStream(InputStream is,
- HttpServletRequest req,
- HttpServletResponse resp) {
- super(is);
- request = req;
- response = resp;
- }
-
- public void close() throws IOException {
- if (!closed) {
- closed = true;
- response.flushBuffer();
- Request baseRequest = (request instanceof Request)
- ? (Request)request : HttpConnection.getCurrentConnection().getRequest();
- baseRequest.setHandled(true);
- }
- }
- }
-
- /**
- * Represented decoupled response endpoint.
- */
- protected class DecoupledDestination implements Destination {
- protected MessageObserver decoupledMessageObserver;
- private EndpointReferenceType address;
-
- DecoupledDestination(EndpointReferenceType ref,
- MessageObserver incomingObserver) {
- address = ref;
- decoupledMessageObserver = incomingObserver;
- }
-
- public EndpointReferenceType getAddress() {
- return address;
- }
-
- public Conduit getBackChannel(Message inMessage,
- Message partialResponse,
- EndpointReferenceType addr)
- throws IOException {
- // shouldn't be called on decoupled endpoint
- return null;
- }
-
- public void shutdown() {
- // TODO Auto-generated method stub
- }
-
- public synchronized void setMessageObserver(MessageObserver observer) {
- decoupledMessageObserver = observer;
- }
-
- protected synchronized MessageObserver getMessageObserver() {
- return decoupledMessageObserver;
- }
- }
-
- /**
- * Handles incoming decoupled responses.
- */
- private class DecoupledHandler extends AbstractHandler {
- private int refCount;
-
- synchronized void duplicate() {
- refCount++;
- }
-
- synchronized void release() {
- if (--refCount == 0) {
- decoupledEngine.removeServant(decoupledURL);
- JettyHTTPServerEngine.destroyForPort(decoupledURL.getPort());
- }
- }
-
- public void handle(String targetURI,
- HttpServletRequest req,
- HttpServletResponse resp,
- int dispatch) throws IOException {
- InputStream responseStream = req.getInputStream();
- Message inMessage = new MessageImpl();
- // disposable exchange, swapped with real Exchange on correlation
- inMessage.setExchange(new ExchangeImpl());
- inMessage.put(DECOUPLED_CHANNEL_MESSAGE, Boolean.TRUE);
- // REVISIT: how to get response headers?
- //inMessage.put(Message.PROTOCOL_HEADERS, req.getXXX());
- setHeaders(inMessage);
- inMessage.put(Message.ENCODING, resp.getCharacterEncoding());
- inMessage.put(Message.CONTENT_TYPE, resp.getContentType());
- inMessage.put(Message.RESPONSE_CODE, HttpURLConnection.HTTP_OK);
- InputStream is = new WrapperInputStream(responseStream, req, resp);
- inMessage.setContent(InputStream.class, is);
-
- try {
- decoupledDestination.getMessageObserver().onMessage(inMessage);
- } finally {
- is.close();
- }
- }
- }
-
private void initConfig() {
config = new ConfigBean();
// Initialize some default values for the configuration
@@ -722,5 +545,106 @@
}
return null;
}
+ }
+
+ /**
+ * Wrapper output stream responsible for flushing headers and handling
+ * the incoming HTTP-level response (not necessarily the MEP response).
+ */
+ private class WrappedOutputStream extends AbstractWrappedOutputStream {
+ protected URLConnection connection;
+
+ WrappedOutputStream(Message m, URLConnection c) {
+ super(m);
+ connection = c;
+ }
+
+ /**
+ * Perform any actions required on stream flush (freeze headers,
+ * reset output stream ... etc.)
+ */
+ protected void doFlush() throws IOException {
+ if (!alreadyFlushed()) {
+ flushHeaders(outMessage);
+ if (connection instanceof HttpURLConnection) {
+ HttpURLConnection hc = (HttpURLConnection)connection;
+ if (hc.getRequestMethod().equals("GET")) {
+ return;
+ }
+ }
+ resetOut(connection.getOutputStream(), true);
+ }
+ }
+
+ /**
+ * Perform any actions required on stream closure (handle response etc.)
+ */
+ protected void doClose() throws IOException {
+ handleResponse();
+ }
+
+ protected void onWrite() throws IOException {
+
+ }
+
+ private void handleResponse() throws IOException {
+ Exchange exchange = outMessage.getExchange();
+ int responseCode = getResponseCode(connection);
+ if (isOneway(exchange)
+ && !isPartialResponse(connection, responseCode)) {
+ // oneway operation without partial response
+ connection.getInputStream().close();
+ return;
+ }
+
+ Message inMessage = new MessageImpl();
+ inMessage.setExchange(exchange);
+ InputStream in = null;
+ Map<String, List<String>> headers = new HashMap<String, List<String>>();
+ for (String key : connection.getHeaderFields().keySet()) {
+ headers.put(HttpHeaderHelper.getHeaderKey(key), connection.getHeaderFields().get(key));
+ }
+ inMessage.put(Message.PROTOCOL_HEADERS, headers);
+ inMessage.put(Message.RESPONSE_CODE, responseCode);
+ inMessage.put(Message.CONTENT_TYPE, connection.getHeaderField(HttpHeaderHelper.CONTENT_TYPE));
+
+ if (connection instanceof HttpURLConnection) {
+ HttpURLConnection hc = (HttpURLConnection)connection;
+ in = hc.getErrorStream();
+ if (null == in) {
+ in = connection.getInputStream();
+ }
+ } else {
+ in = connection.getInputStream();
+ }
+
+ inMessage.setContent(InputStream.class, in);
+
+ incomingObserver.onMessage(inMessage);
+ }
+ }
+
+ /**
+ * Used to set appropriate message properties, exchange etc.
+ * as required for an incoming decoupled response (as opposed
+ * what's normally set by the Destination for an incoming
+ * request).
+ */
+ protected class InterposedMessageObserver implements MessageObserver {
+ /**
+ * Called for an incoming message.
+ *
+ * @param inMessage
+ */
+ public void onMessage(Message inMessage) {
+ // disposable exchange, swapped with real Exchange on correlation
+ inMessage.setExchange(new ExchangeImpl());
+ inMessage.put(DECOUPLED_CHANNEL_MESSAGE, Boolean.TRUE);
+ // REVISIT: how to get response headers?
+ //inMessage.put(Message.PROTOCOL_HEADERS, req.getXXX());
+ setHeaders(inMessage);
+ inMessage.put(Message.RESPONSE_CODE, HttpURLConnection.HTTP_OK);
+ incomingObserver.onMessage(inMessage);
+ }
}
}
Modified: incubator/cxf/trunk/rt/transports/http2/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/http2/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java?view=diff&rev=504504&r1=504503&r2=504504
==============================================================================
--- incubator/cxf/trunk/rt/transports/http2/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java (original)
+++ incubator/cxf/trunk/rt/transports/http2/src/test/java/org/apache/cxf/transport/http/HTTPConduitTest.java Wed Feb 7 03:19:20 2007
@@ -33,10 +33,8 @@
import java.util.List;
import java.util.Map;
-import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
-import javax.servlet.http.HttpServletResponse;
import junit.framework.TestCase;
@@ -45,17 +43,18 @@
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.Destination;
+import org.apache.cxf.transport.DestinationFactory;
+import org.apache.cxf.transport.DestinationFactoryManager;
import org.apache.cxf.transport.MessageObserver;
import org.apache.cxf.transport.http.conduit.HTTPConduitConfigBean;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
import org.easymock.classextension.EasyMock;
import org.easymock.classextension.IMocksControl;
-import org.mortbay.jetty.Handler;
-import org.mortbay.jetty.Request;
-import org.mortbay.jetty.handler.AbstractHandler;
+
+import static org.apache.cxf.message.Message.DECOUPLED_CHANNEL_MESSAGE;
+
public class HTTPConduitTest extends TestCase {
private static final String NOWHERE = "http://nada.nothing.nowhere.null/";
@@ -69,8 +68,6 @@
private MessageObserver observer;
private ServletOutputStream os;
private ServletInputStream is;
- private TestServerEngine decoupledEngine;
- //private MultiMap parameters;
private IMocksControl control;
public void setUp() throws Exception {
@@ -89,8 +86,6 @@
observer = null;
os = null;
is = null;
- //parameters = null;
- decoupledEngine = null;
}
public void testGetTarget() throws Exception {
@@ -205,23 +200,33 @@
EasyMock.expectLastCall();
}
}
-
- if (decoupled) {
- decoupledEngine = new TestServerEngine();
- //parameters = control.createMock(MultiMap.class);
- }
-
}
+ CXFBusImpl bus = new CXFBusImpl();
+ URL decoupledURL = null;
+ if (decoupled) {
+ decoupledURL = new URL(NOWHERE + "response");
+ DestinationFactoryManager mgr =
+ control.createMock(DestinationFactoryManager.class);
+ DestinationFactory factory =
+ control.createMock(DestinationFactory.class);
+ Destination destination =
+ control.createMock(Destination.class);
+
+ bus.setExtension(mgr, DestinationFactoryManager.class);
+ mgr.getDestinationFactoryForUri(decoupledURL.toString());
+ EasyMock.expectLastCall().andReturn(factory);
+ factory.getDestination(EasyMock.isA(EndpointInfo.class));
+ EasyMock.expectLastCall().andReturn(destination);
+ destination.setMessageObserver(EasyMock.isA(HTTPConduit.InterposedMessageObserver.class));
+ }
control.replay();
- CXFBusImpl bus = new CXFBusImpl();
HTTPConduit conduit = new HTTPConduit(bus,
endpointInfo,
null,
- connectionFactory,
- decoupledEngine);
+ connectionFactory);
conduit.retrieveConnectionFactory();
HTTPConduitConfigBean config = conduit.getConfig();
@@ -235,16 +240,14 @@
}
}
}
-
+
if (decoupled) {
- URL decoupledURL = null;
- if (decoupled) {
- decoupledURL = new URL(NOWHERE + "response");
- config.getClient().setDecoupledEndpoint(decoupledURL.toString());
- }
+ config.getClient().setDecoupledEndpoint(decoupledURL.toString());
+ assertNotNull("expected back channel", conduit.getBackChannel());
+ } else {
+ assertNull("unexpected back channel", conduit.getBackChannel());
}
-
observer = new MessageObserver() {
public void onMessage(Message m) {
inMessage = m;
@@ -254,19 +257,19 @@
return conduit;
}
- private void verifySentMessage(Conduit conduit, Message message)
+ private void verifySentMessage(HTTPConduit conduit, Message message)
throws IOException {
verifySentMessage(conduit, message, false);
}
- private void verifySentMessage(Conduit conduit,
+ private void verifySentMessage(HTTPConduit conduit,
Message message,
boolean expectHeaders)
throws IOException {
verifySentMessage(conduit, message, expectHeaders, false);
}
- private void verifySentMessage(Conduit conduit,
+ private void verifySentMessage(HTTPConduit conduit,
Message message,
boolean expectHeaders,
boolean decoupled)
@@ -287,11 +290,6 @@
EasyMock.expectLastCall().andReturn(os);
os.write(PAYLOAD.getBytes(), 0, PAYLOAD.length());
EasyMock.expectLastCall();
-
- URL decoupledURL = null;
- if (decoupled) {
- decoupledURL = new URL(NOWHERE + "response");
- }
os.flush();
EasyMock.expectLastCall();
@@ -303,26 +301,7 @@
verifyHandleResponse(decoupled);
control.replay();
-
- Destination backChannel = null;
- AbstractHandler decoupledHandler = null;
- if (decoupled) {
- decoupledEngine.verifyCallCounts(new int[]{0, 0, 0});
- backChannel = conduit.getBackChannel();
- assertNotNull("expected back channel", backChannel);
- decoupledEngine.verifyCallCounts(new int[]{1, 0, 1});
- decoupledHandler = decoupledEngine.servants.get(decoupledURL);
- assertNotNull("expected servant registered", decoupledHandler);
- MessageObserver decoupledObserver =
- ((HTTPConduit.DecoupledDestination)backChannel).getMessageObserver();
- assertSame("unexpected decoupled destination",
- observer,
- decoupledObserver);
- } else {
- backChannel = conduit.getBackChannel();
- assertNull("unexpected back channel", backChannel);
- }
-
+
wrappedOS.flush();
wrappedOS.flush();
wrappedOS.close();
@@ -341,13 +320,10 @@
assertSame("unexpected content", is, inMessage.getContent(InputStream.class));
if (decoupled) {
- verifyDecoupledResponse(decoupledHandler);
+ verifyDecoupledResponse(conduit);
}
conduit.close();
- if (decoupled) {
- decoupledEngine.verifyCallCounts(new int[]{1, 1, 2});
- }
finalVerify();
}
@@ -400,49 +376,19 @@
EasyMock.expectLastCall().andReturn(is);
}
- private void verifyDecoupledResponse(AbstractHandler decoupledHandler)
+ private void verifyDecoupledResponse(HTTPConduit conduit)
throws IOException {
- inMessage = null;
- is = EasyMock.createMock(ServletInputStream.class);
- os = EasyMock.createMock(ServletOutputStream.class);
- Request decoupledRequest = EasyMock.createMock(Request.class);
- decoupledRequest.getInputStream();
- EasyMock.expectLastCall().andReturn(is);
- decoupledRequest.setHandled(true);
- EasyMock.replay(decoupledRequest);
-
- HttpServletResponse decoupledResponse = EasyMock.createMock(HttpServletResponse.class);
- decoupledResponse.getCharacterEncoding();
- EasyMock.expectLastCall().andReturn("utf8");
- decoupledResponse.getContentType();
- EasyMock.expectLastCall().andReturn("test");
- decoupledResponse.flushBuffer();
- EasyMock.expectLastCall();
- EasyMock.replay(decoupledResponse);
-
- try {
- decoupledHandler.handle("pathInContext",
- decoupledRequest,
- decoupledResponse, Handler.REQUEST);
- } catch (ServletException e) {
- fail("There should not throw the serletException");
- }
- assertNotNull("expected decoupled in message", inMessage);
- assertNotNull("expected response headers",
- inMessage.get(Message.PROTOCOL_HEADERS));
+ Message incoming = new MessageImpl();
+ conduit.getDecoupledObserver().onMessage(incoming);
+ assertSame("expected pass thru onMessage() notification",
+ inMessage,
+ incoming);
assertEquals("unexpected response code",
HttpURLConnection.HTTP_OK,
inMessage.get(Message.RESPONSE_CODE));
-
- assertTrue("unexpected content formats",
- inMessage.getContentFormats().contains(InputStream.class));
- InputStream decoupledIS = inMessage.getContent(InputStream.class);
- assertNotNull("unexpected content", decoupledIS);
-
- decoupledIS.close();
-
- inMessage.setContent(InputStream.class, is);
-
+ assertEquals("expected DECOUPLED_CHANNEL_MESSAGE flag set",
+ Boolean.TRUE,
+ inMessage.get(DECOUPLED_CHANNEL_MESSAGE));
}
private void finalVerify() {
@@ -454,43 +400,5 @@
static EndpointReferenceType getEPR(String s) {
return EndpointReferenceUtils.getEndpointReference(NOWHERE + s);
- }
-
- /**
- * EasyMock does not seem able to properly mock calls to ServerEngine -
- * expectations set seem to be ignored.
- */
- private class TestServerEngine implements ServerEngine {
- private int callCounts[] = {0, 0, 0};
- private Map<URL, AbstractHandler> servants =
- new HashMap<URL, AbstractHandler>();
-
- public void addServant(URL url, AbstractHandler handler) {
- callCounts[0]++;
- servants.put(url, handler);
- }
-
- public void removeServant(URL url) {
- callCounts[1]++;
- servants.remove(url);
- }
-
- public Handler getServant(URL url) {
- callCounts[2]++;
- return servants.get(url);
- }
-
- void verifyCallCounts(int expectedCallCounts[]) {
- assertEquals("unexpected addServant call count",
- expectedCallCounts[0],
- callCounts[0]);
- assertEquals("unexpected removeServant call count",
- expectedCallCounts[1],
- callCounts[1]);
- assertEquals("unexpected getServant call count",
- expectedCallCounts[2],
- callCounts[2]);
- }
-
}
}