You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2013/11/19 03:22:09 UTC

svn commit: r1543289 - in /cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local: LocalConduit.java LocalDestination.java

Author: dkulp
Date: Tue Nov 19 02:22:08 2013
New Revision: 1543289

URL: http://svn.apache.org/r1543289
Log:
Merged revisions 1533037 via  git cherry-pick from
https://svn.apache.org/repos/asf/cxf/branches/2.7.x-fixes

........
  r1533037 | sergeyb | 2013-10-17 06:42:58 -0400 (Thu, 17 Oct 2013) | 17 lines

  Merged revisions 1532865,1532871,1532892 via svnmerge from
  https://svn.apache.org/repos/asf/cxf/trunk

  ........
    r1532865 | dkulp | 2013-10-16 20:14:50 +0100 (Wed, 16 Oct 2013) | 1 line

    Get empty responses being sent back with local transport
  ........
    r1532871 | dkulp | 2013-10-16 20:25:50 +0100 (Wed, 16 Oct 2013) | 1 line

    If the dispatch throws an excpetion (likely from the fault out chain), try to do something smart with it.
  ........
    r1532892 | sergeyb | 2013-10-16 21:28:49 +0100 (Wed, 16 Oct 2013) | 1 line

    Adding more local transport tests
  ........

........

Modified:
    cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
    cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java

Modified: cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java?rev=1543289&r1=1543288&r2=1543289&view=diff
==============================================================================
--- cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java (original)
+++ cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java Tue Nov 19 02:22:08 2013
@@ -24,6 +24,8 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PipedInputStream;
 import java.io.PipedOutputStream;
+import java.util.HashMap;
+import java.util.List;
 import java.util.concurrent.Executor;
 import java.util.logging.Logger;
 
@@ -80,7 +82,24 @@ public class LocalConduit extends Abstra
                     ex.setInMessage(inMsg);
                     inMsg.setExchange(ex);
                     ex.put(IN_EXCHANGE, exchange);
-                    destination.getMessageObserver().onMessage(inMsg);
+                    try {
+                        destination.getMessageObserver().onMessage(inMsg);
+                    } catch (Throwable t) {
+                        Message m = inMsg.getExchange().getOutFaultMessage();
+                        if (m == null) {
+                            m = inMsg.getExchange().getOutMessage();
+                        }
+                        if (m != null) {
+                            try {
+                                m.put(Message.RESPONSE_CODE, 500);
+                                m.put(Message.PROTOCOL_HEADERS, new HashMap<String, List<String>>());
+                                m.getExchange().put(Message.RESPONSE_CODE, 500);
+                                m.getContent(OutputStream.class).close();
+                            } catch (IOException e) {
+                                //ignore
+                            }
+                        }
+                    }
                 }
             };
             Executor ex = message.getExchange() != null

Modified: cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java?rev=1543289&r1=1543288&r2=1543289&view=diff
==============================================================================
--- cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java (original)
+++ cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java Tue Nov 19 02:22:08 2013
@@ -70,6 +70,59 @@ public class LocalDestination extends Ab
     }
 
     class SynchronousConduit extends AbstractConduit {
+        private final class LocalDestinationOutputStream extends AbstractWrappedOutputStream {
+            private final Exchange exchange;
+            private final Message message;
+
+            private LocalDestinationOutputStream(Exchange exchange, Message message) {
+                this.exchange = exchange;
+                this.message = message;
+            }
+
+            public void close() throws IOException {
+                if (!written) {
+                    dispatchToClient(true);
+                }
+                super.close();
+            }
+
+            protected void onFirstWrite() throws IOException {
+                dispatchToClient(false);
+            }
+
+            protected void dispatchToClient(boolean empty) throws IOException {
+                final MessageImpl m = new MessageImpl();
+                localDestinationFactory.copy(message, m);
+                if (!empty) {
+                    final PipedInputStream stream = new PipedInputStream();
+                    wrappedStream = new PipedOutputStream(stream);
+                    m.setContent(InputStream.class, stream);
+                }
+
+                final Runnable receiver = new Runnable() {
+                    public void run() {                                    
+                        if (exchange != null) {
+                            exchange.setInMessage(m);
+                        }
+                        conduit.getMessageObserver().onMessage(m);
+                    }
+                };
+                Executor ex = message.getExchange() != null
+                    ? message.getExchange().get(Executor.class) : null;
+                // Need to avoid to get the SynchronousExecutor
+                if (ex == null || SynchronousExecutor.isA(ex)) {
+                    ex = localDestinationFactory.getExecutor();
+                    if (ex != null) {
+                        ex.execute(receiver);
+                    } else {
+                        new Thread(receiver).start();
+                    }
+                } else {
+                    ex.execute(receiver);
+                }
+            }
+        }
+
         private LocalConduit conduit;
 
         public SynchronousConduit(LocalConduit conduit) {
@@ -82,38 +135,7 @@ public class LocalDestination extends Ab
                 final Exchange exchange = (Exchange)message.getExchange().get(LocalConduit.IN_EXCHANGE);
 
                 AbstractWrappedOutputStream cout 
-                    = new AbstractWrappedOutputStream() {
-                        protected void onFirstWrite() throws IOException {
-                            final PipedInputStream stream = new PipedInputStream();
-                            wrappedStream = new PipedOutputStream(stream);
-
-                            final MessageImpl m = new MessageImpl();
-                            localDestinationFactory.copy(message, m);
-                            m.setContent(InputStream.class, stream);
-
-                            final Runnable receiver = new Runnable() {
-                                public void run() {                                    
-                                    if (exchange != null) {
-                                        exchange.setInMessage(m);
-                                    }
-                                    conduit.getMessageObserver().onMessage(m);
-                                }
-                            };
-                            Executor ex = message.getExchange() != null
-                                ? message.getExchange().get(Executor.class) : null;
-                            // Need to avoid to get the SynchronousExecutor
-                            if (ex == null || SynchronousExecutor.isA(ex)) {
-                                ex = localDestinationFactory.getExecutor();
-                                if (ex != null) {
-                                    ex.execute(receiver);
-                                } else {
-                                    new Thread(receiver).start();
-                                }
-                            } else {
-                                ex.execute(receiver);
-                            }
-                        }
-                    };
+                    = new LocalDestinationOutputStream(exchange, message);
                 
                 message.setContent(OutputStream.class, cout);