You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2013/11/19 03:22:09 UTC
svn commit: r1543289 - in
/cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local:
LocalConduit.java LocalDestination.java
Author: dkulp
Date: Tue Nov 19 02:22:08 2013
New Revision: 1543289
URL: http://svn.apache.org/r1543289
Log:
Merged revisions 1533037 via git cherry-pick from
https://svn.apache.org/repos/asf/cxf/branches/2.7.x-fixes
........
r1533037 | sergeyb | 2013-10-17 06:42:58 -0400 (Thu, 17 Oct 2013) | 17 lines
Merged revisions 1532865,1532871,1532892 via svnmerge from
https://svn.apache.org/repos/asf/cxf/trunk
........
r1532865 | dkulp | 2013-10-16 20:14:50 +0100 (Wed, 16 Oct 2013) | 1 line
Get empty responses being sent back with local transport
........
r1532871 | dkulp | 2013-10-16 20:25:50 +0100 (Wed, 16 Oct 2013) | 1 line
If the dispatch throws an excpetion (likely from the fault out chain), try to do something smart with it.
........
r1532892 | sergeyb | 2013-10-16 21:28:49 +0100 (Wed, 16 Oct 2013) | 1 line
Adding more local transport tests
........
........
Modified:
cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
Modified: cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java?rev=1543289&r1=1543288&r2=1543289&view=diff
==============================================================================
--- cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java (original)
+++ cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalConduit.java Tue Nov 19 02:22:08 2013
@@ -24,6 +24,8 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
+import java.util.HashMap;
+import java.util.List;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
@@ -80,7 +82,24 @@ public class LocalConduit extends Abstra
ex.setInMessage(inMsg);
inMsg.setExchange(ex);
ex.put(IN_EXCHANGE, exchange);
- destination.getMessageObserver().onMessage(inMsg);
+ try {
+ destination.getMessageObserver().onMessage(inMsg);
+ } catch (Throwable t) {
+ Message m = inMsg.getExchange().getOutFaultMessage();
+ if (m == null) {
+ m = inMsg.getExchange().getOutMessage();
+ }
+ if (m != null) {
+ try {
+ m.put(Message.RESPONSE_CODE, 500);
+ m.put(Message.PROTOCOL_HEADERS, new HashMap<String, List<String>>());
+ m.getExchange().put(Message.RESPONSE_CODE, 500);
+ m.getContent(OutputStream.class).close();
+ } catch (IOException e) {
+ //ignore
+ }
+ }
+ }
}
};
Executor ex = message.getExchange() != null
Modified: cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java?rev=1543289&r1=1543288&r2=1543289&view=diff
==============================================================================
--- cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java (original)
+++ cxf/branches/2.6.x-fixes/rt/transports/local/src/main/java/org/apache/cxf/transport/local/LocalDestination.java Tue Nov 19 02:22:08 2013
@@ -70,6 +70,59 @@ public class LocalDestination extends Ab
}
class SynchronousConduit extends AbstractConduit {
+ private final class LocalDestinationOutputStream extends AbstractWrappedOutputStream {
+ private final Exchange exchange;
+ private final Message message;
+
+ private LocalDestinationOutputStream(Exchange exchange, Message message) {
+ this.exchange = exchange;
+ this.message = message;
+ }
+
+ public void close() throws IOException {
+ if (!written) {
+ dispatchToClient(true);
+ }
+ super.close();
+ }
+
+ protected void onFirstWrite() throws IOException {
+ dispatchToClient(false);
+ }
+
+ protected void dispatchToClient(boolean empty) throws IOException {
+ final MessageImpl m = new MessageImpl();
+ localDestinationFactory.copy(message, m);
+ if (!empty) {
+ final PipedInputStream stream = new PipedInputStream();
+ wrappedStream = new PipedOutputStream(stream);
+ m.setContent(InputStream.class, stream);
+ }
+
+ final Runnable receiver = new Runnable() {
+ public void run() {
+ if (exchange != null) {
+ exchange.setInMessage(m);
+ }
+ conduit.getMessageObserver().onMessage(m);
+ }
+ };
+ Executor ex = message.getExchange() != null
+ ? message.getExchange().get(Executor.class) : null;
+ // Need to avoid to get the SynchronousExecutor
+ if (ex == null || SynchronousExecutor.isA(ex)) {
+ ex = localDestinationFactory.getExecutor();
+ if (ex != null) {
+ ex.execute(receiver);
+ } else {
+ new Thread(receiver).start();
+ }
+ } else {
+ ex.execute(receiver);
+ }
+ }
+ }
+
private LocalConduit conduit;
public SynchronousConduit(LocalConduit conduit) {
@@ -82,38 +135,7 @@ public class LocalDestination extends Ab
final Exchange exchange = (Exchange)message.getExchange().get(LocalConduit.IN_EXCHANGE);
AbstractWrappedOutputStream cout
- = new AbstractWrappedOutputStream() {
- protected void onFirstWrite() throws IOException {
- final PipedInputStream stream = new PipedInputStream();
- wrappedStream = new PipedOutputStream(stream);
-
- final MessageImpl m = new MessageImpl();
- localDestinationFactory.copy(message, m);
- m.setContent(InputStream.class, stream);
-
- final Runnable receiver = new Runnable() {
- public void run() {
- if (exchange != null) {
- exchange.setInMessage(m);
- }
- conduit.getMessageObserver().onMessage(m);
- }
- };
- Executor ex = message.getExchange() != null
- ? message.getExchange().get(Executor.class) : null;
- // Need to avoid to get the SynchronousExecutor
- if (ex == null || SynchronousExecutor.isA(ex)) {
- ex = localDestinationFactory.getExecutor();
- if (ex != null) {
- ex.execute(receiver);
- } else {
- new Thread(receiver).start();
- }
- } else {
- ex.execute(receiver);
- }
- }
- };
+ = new LocalDestinationOutputStream(exchange, message);
message.setContent(OutputStream.class, cout);