You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by da...@apache.org on 2007/03/27 19:42:19 UTC
svn commit: r523011 - in /incubator/cxf/trunk/rt/transports/local/src:
main/java/org/apache/cxf/transport/local/
test/java/org/apache/cxf/transport/local/
Author: dandiep
Date: Tue Mar 27 10:42:18 2007
New Revision: 523011
URL: http://svn.apache.org/viewvc?view=rev&rev=523011
Log:
Allow people to directly invoke a Destination, instead of forcing them to go throughan OutputStream.
Modified:
incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java
Modified: incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java?view=diff&rev=523011&r1=523010&r2=523011
==============================================================================
--- incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java (original)
+++ incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java Tue Mar 27 10:42:18 2007
@@ -37,7 +37,9 @@
public class LocalConduit extends AbstractConduit {
public static final String IN_CONDUIT = LocalConduit.class.getName() + ".inConduit";
+ public static final String RESPONSE_CONDUIT = LocalConduit.class.getName() + ".inConduit";
public static final String IN_EXCHANGE = LocalConduit.class.getName() + ".inExchange";
+ public static final String DIRECT_DISPATCH = LocalConduit.class.getName() + ".directDispatch";
private static final Logger LOG = LogUtils.getL7dLogger(LocalConduit.class);
@@ -49,6 +51,30 @@
}
public void send(final Message message) throws IOException {
+ if (Boolean.TRUE.equals(message.get(DIRECT_DISPATCH))) {
+ dispatchDirect(message);
+ } else {
+ dispatchViaPipe(message);
+ }
+ }
+
+ private void dispatchDirect(Message message) {
+ if (destination.getMessageObserver() == null) {
+ throw new IllegalStateException("Local destination does not have a MessageObserver on address "
+ + destination.getAddress().getAddress().getValue());
+ }
+
+ message.put(IN_CONDUIT, this);
+ Exchange exchange = message.getExchange();
+ if (exchange == null) {
+ exchange = new ExchangeImpl();
+ exchange.setInMessage(message);
+ }
+ exchange.setDestination(destination);
+ destination.getMessageObserver().onMessage(message);
+ }
+
+ private void dispatchViaPipe(final Message message) throws IOException {
final PipedInputStream stream = new PipedInputStream();
final LocalConduit conduit = this;
final Exchange exchange = message.getExchange();
@@ -60,15 +86,15 @@
final Runnable receiver = new Runnable() {
public void run() {
- MessageImpl m = new MessageImpl();
- m.setContent(InputStream.class, stream);
- m.setDestination(destination);
- m.put(IN_CONDUIT, conduit);
+ MessageImpl inMsg = new MessageImpl();
+ inMsg.setContent(InputStream.class, stream);
+ inMsg.setDestination(destination);
+ inMsg.put(IN_CONDUIT, conduit);
ExchangeImpl ex = new ExchangeImpl();
- ex.setInMessage(m);
+ ex.setInMessage(inMsg);
ex.put(IN_EXCHANGE, exchange);
- destination.getMessageObserver().onMessage(m);
+ destination.getMessageObserver().onMessage(inMsg);
}
};
Modified: incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java?view=diff&rev=523011&r1=523010&r2=523011
==============================================================================
--- incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java (original)
+++ incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java Tue Mar 27 10:42:18 2007
@@ -41,9 +41,12 @@
implements DestinationFactory, ConduitInitiator {
public static final String TRANSPORT_ID = "http://cxf.apache.org/transports/local";
-
+
+ public static final String DISPATCH_DIRECT = "dispatch.direct";
+
private static final Logger LOG = Logger.getLogger(LocalTransportFactory.class.getName());
private static final Set<String> URI_PREFIXES = new HashSet<String>();
+
static {
URI_PREFIXES.add("local://");
}
Modified: incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java?view=diff&rev=523011&r1=523010&r2=523011
==============================================================================
--- incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java (original)
+++ incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java Tue Mar 27 10:42:18 2007
@@ -19,6 +19,7 @@
package org.apache.cxf.transport.local;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -32,9 +33,11 @@
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.Destination;
import org.apache.cxf.transport.MessageObserver;
+import org.junit.Test;
import org.xmlsoap.schemas.wsdl.http.AddressType;
public class LocalTransportFactoryTest extends TestCase {
+ @Test
public void testTransportFactory() throws Exception {
LocalTransportFactory factory = new LocalTransportFactory();
@@ -62,12 +65,39 @@
assertEquals("hello", obs.getResponseStream().toString());
}
+ @Test
+ public void testDirectInvocation() throws Exception {
+ LocalTransportFactory factory = new LocalTransportFactory();
+
+ EndpointInfo ei = new EndpointInfo(null, "http://schemas.xmlsoap.org/soap/http");
+ AddressType a = new AddressType();
+ a.setLocation("http://localhost/test");
+ ei.addExtensor(a);
+
+ LocalDestination d = (LocalDestination) factory.getDestination(ei);
+ d.setMessageObserver(new EchoObserver());
+
+ // Set up a listener for the response
+ Conduit conduit = factory.getConduit(ei);
+ TestMessageObserver obs = new TestMessageObserver();
+ conduit.setMessageObserver(obs);
+
+ MessageImpl m = new MessageImpl();
+ m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
+ m.setDestination(d);
+ m.setContent(InputStream.class, new ByteArrayInputStream("hello".getBytes()));
+ conduit.send(m);
+
+ assertEquals("hello", obs.getResponseStream().toString());
+
+ }
static class EchoObserver implements MessageObserver {
public void onMessage(Message message) {
try {
Conduit backChannel = message.getDestination().getBackChannel(message, null, null);
-
+ message.remove(LocalConduit.DIRECT_DISPATCH);
+
backChannel.send(message);
OutputStream out = message.getContent(OutputStream.class);
@@ -116,6 +146,7 @@
public synchronized void onMessage(Message message) {
try {
+ message.remove(LocalConduit.DIRECT_DISPATCH);
copy(message.getContent(InputStream.class), response, 1024);
} catch (IOException e) {
e.printStackTrace();