You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by da...@apache.org on 2007/03/27 19:42:19 UTC

svn commit: r523011 - in /incubator/cxf/trunk/rt/transports/local/src: main/java/org/apache/cxf/transport/local/ test/java/org/apache/cxf/transport/local/

Author: dandiep
Date: Tue Mar 27 10:42:18 2007
New Revision: 523011

URL: http://svn.apache.org/viewvc?view=rev&rev=523011
Log:
Allow people to directly invoke a Destination, instead of forcing them to go throughan OutputStream.

Modified:
    incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
    incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
    incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java

Modified: incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java?view=diff&rev=523011&r1=523010&r2=523011
==============================================================================
--- incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java (original)
+++ incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java Tue Mar 27 10:42:18 2007
@@ -37,7 +37,9 @@
 public class LocalConduit extends AbstractConduit {
 
     public static final String IN_CONDUIT = LocalConduit.class.getName() + ".inConduit";
+    public static final String RESPONSE_CONDUIT = LocalConduit.class.getName() + ".inConduit";
     public static final String IN_EXCHANGE = LocalConduit.class.getName() + ".inExchange";
+    public static final String DIRECT_DISPATCH = LocalConduit.class.getName() + ".directDispatch";
 
     private static final Logger LOG = LogUtils.getL7dLogger(LocalConduit.class);
     
@@ -49,6 +51,30 @@
     }
     
     public void send(final Message message) throws IOException {
+        if (Boolean.TRUE.equals(message.get(DIRECT_DISPATCH))) {
+            dispatchDirect(message);
+        } else {
+            dispatchViaPipe(message);
+        }
+    }
+
+    private void dispatchDirect(Message message) {
+        if (destination.getMessageObserver() == null) {
+            throw new IllegalStateException("Local destination does not have a MessageObserver on address " 
+                                            + destination.getAddress().getAddress().getValue());
+        }
+        
+        message.put(IN_CONDUIT, this);
+        Exchange exchange = message.getExchange();
+        if (exchange == null) {
+            exchange = new ExchangeImpl();
+            exchange.setInMessage(message);
+        }
+        exchange.setDestination(destination);
+        destination.getMessageObserver().onMessage(message);
+    }
+
+    private void dispatchViaPipe(final Message message) throws IOException {
         final PipedInputStream stream = new PipedInputStream();
         final LocalConduit conduit = this;
         final Exchange exchange = message.getExchange();
@@ -60,15 +86,15 @@
         
         final Runnable receiver = new Runnable() {
             public void run() {
-                MessageImpl m = new MessageImpl();
-                m.setContent(InputStream.class, stream);
-                m.setDestination(destination);
-                m.put(IN_CONDUIT, conduit);
+                MessageImpl inMsg = new MessageImpl();
+                inMsg.setContent(InputStream.class, stream);
+                inMsg.setDestination(destination);
+                inMsg.put(IN_CONDUIT, conduit);
                 
                 ExchangeImpl ex = new ExchangeImpl();
-                ex.setInMessage(m);
+                ex.setInMessage(inMsg);
                 ex.put(IN_EXCHANGE, exchange);
-                destination.getMessageObserver().onMessage(m);
+                destination.getMessageObserver().onMessage(inMsg);
             }
         };
 

Modified: incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java?view=diff&rev=523011&r1=523010&r2=523011
==============================================================================
--- incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java (original)
+++ incubator/cxf/trunk/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalTransportFactory.java Tue Mar 27 10:42:18 2007
@@ -41,9 +41,12 @@
     implements DestinationFactory, ConduitInitiator {
    
     public static final String TRANSPORT_ID = "http://cxf.apache.org/transports/local";
-    
+
+    public static final String DISPATCH_DIRECT = "dispatch.direct";
+        
     private static final Logger LOG = Logger.getLogger(LocalTransportFactory.class.getName());
     private static final Set<String> URI_PREFIXES = new HashSet<String>();
+
     static {
         URI_PREFIXES.add("local://");
     }

Modified: incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java?view=diff&rev=523011&r1=523010&r2=523011
==============================================================================
--- incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java (original)
+++ incubator/cxf/trunk/rt/transports/local/src/test/java/org/apache/cxf/transport/local/LocalTransportFactoryTest.java Tue Mar 27 10:42:18 2007
@@ -19,6 +19,7 @@
 
 package org.apache.cxf.transport.local;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -32,9 +33,11 @@
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.Destination;
 import org.apache.cxf.transport.MessageObserver;
+import org.junit.Test;
 import org.xmlsoap.schemas.wsdl.http.AddressType;
 
 public class LocalTransportFactoryTest extends TestCase {
+    @Test
     public void testTransportFactory() throws Exception {
         LocalTransportFactory factory = new LocalTransportFactory();
         
@@ -62,12 +65,39 @@
         assertEquals("hello", obs.getResponseStream().toString());
     }
 
+    @Test
+    public void testDirectInvocation() throws Exception {
+        LocalTransportFactory factory = new LocalTransportFactory();
+        
+        EndpointInfo ei = new EndpointInfo(null, "http://schemas.xmlsoap.org/soap/http");
+        AddressType a = new AddressType();
+        a.setLocation("http://localhost/test");
+        ei.addExtensor(a);
+
+        LocalDestination d = (LocalDestination) factory.getDestination(ei);
+        d.setMessageObserver(new EchoObserver());
+        
+        // Set up a listener for the response
+        Conduit conduit = factory.getConduit(ei);
+        TestMessageObserver obs = new TestMessageObserver();
+        conduit.setMessageObserver(obs);
+        
+        MessageImpl m = new MessageImpl();
+        m.put(LocalConduit.DIRECT_DISPATCH, Boolean.TRUE);
+        m.setDestination(d);
+        m.setContent(InputStream.class, new ByteArrayInputStream("hello".getBytes()));
+        conduit.send(m);
+
+        assertEquals("hello", obs.getResponseStream().toString());
+
+    }
     static class EchoObserver implements MessageObserver {
 
         public void onMessage(Message message) {
             try {
                 Conduit backChannel = message.getDestination().getBackChannel(message, null, null);
-
+                message.remove(LocalConduit.DIRECT_DISPATCH);
+                
                 backChannel.send(message);
 
                 OutputStream out = message.getContent(OutputStream.class);
@@ -116,6 +146,7 @@
 
         public synchronized void onMessage(Message message) {
             try {
+                message.remove(LocalConduit.DIRECT_DISPATCH);
                 copy(message.getContent(InputStream.class), response, 1024);
             } catch (IOException e) {
                 e.printStackTrace();