You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/08/17 19:05:17 UTC
svn commit: r1374362 - in /cxf/trunk/rt:
transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java
Author: dkulp
Date: Fri Aug 17 17:05:17 2012
New Revision: 1374362
URL: http://svn.apache.org/viewvc?rev=1374362&view=rev
Log:
When using multicast with async callbacks, allow multiple responses.
Modified:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java
Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1374362&r1=1374361&r2=1374362&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java (original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java Fri Aug 17 17:05:17 2012
@@ -58,6 +58,7 @@ import org.apache.mina.transport.socket.
*/
public class UDPConduit extends AbstractConduit {
private static final String CXF_MESSAGE_ATTR = "CXFMessage";
+ private static final String MULTI_RESPONSE_TIMEOUT = "udp.multi.response.timeout";
private static final String HOST_PORT = UDPConduit.class + ".host:port";
private static final Logger LOG = LogUtils.getL7dLogger(UDPDestination.class);
@@ -103,8 +104,10 @@ public class UDPConduit extends Abstract
});
} else {
incomingObserver.onMessage(inMessage);
+ if (!message.getExchange().isSynchronous()) {
+ message.getExchange().setInMessage(null);
+ }
}
-
} else {
IoSessionInputStream ins = message.getExchange().getInMessage().get(IoSessionInputStream.class);
ins.setBuffer((IoBuffer)buf);
@@ -226,7 +229,7 @@ public class UDPConduit extends Abstract
socket.setReceiveBufferSize(64 * 1024);
socket.setBroadcast(true);
- if (multicast != null) {
+ if (multicast == null) {
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface networkInterface = interfaces.nextElement();
@@ -267,9 +270,26 @@ public class UDPConduit extends Abstract
if (!message.getExchange().isOneWay()) {
byte bytes[] = new byte[64 * 1024];
DatagramPacket p = new DatagramPacket(bytes, bytes.length);
- socket.setSoTimeout(30000);
- socket.receive(p);
- dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()), false);
+ Integer i = (Integer)message.getContextualProperty(MULTI_RESPONSE_TIMEOUT);
+ if (i == null) {
+ socket.setSoTimeout(30000);
+ socket.receive(p);
+ dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()), false);
+ } else {
+ socket.setSoTimeout(i);
+ boolean found = false;
+ try {
+ while (true) {
+ socket.receive(p);
+ dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()), false);
+ found = true;
+ }
+ } catch (java.net.SocketTimeoutException ex) {
+ if (!found) {
+ throw ex;
+ }
+ }
+ }
}
socket.close();
}
Modified: cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java?rev=1374362&r1=1374361&r2=1374362&view=diff
==============================================================================
--- cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java (original)
+++ cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java Fri Aug 17 17:05:17 2012
@@ -803,8 +803,14 @@ public class MAPCodec extends AbstractSo
} else if (!MessageUtils.getContextualBoolean(message,
"org.apache.cxf.ws.addressing.MAPAggregator.addressingDisabled",
false)) {
- LOG.log(Level.WARNING, "CORRELATION_FAILURE_MSG");
- message.getInterceptorChain().abort();
+ //see if it can directly be correlated with the out message:
+ AddressingProperties outp = ContextUtils.retrieveMAPs(message.getExchange().getOutMessage(),
+ false, true, false);
+ if (outp == null
+ || !outp.getMessageID().getValue().equals(maps.getRelatesTo().getValue())) {
+ LOG.log(Level.WARNING, "CORRELATION_FAILURE_MSG");
+ message.getInterceptorChain().abort();
+ }
}
}
} else if (maps == null && isRequestor(message)) {