You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2012/08/17 19:05:17 UTC

svn commit: r1374362 - in /cxf/trunk/rt: transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java

Author: dkulp
Date: Fri Aug 17 17:05:17 2012
New Revision: 1374362

URL: http://svn.apache.org/viewvc?rev=1374362&view=rev
Log:
When using multicast with async callbacks, allow multiple responses.

Modified:
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
    cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java

Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1374362&r1=1374361&r2=1374362&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java (original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java Fri Aug 17 17:05:17 2012
@@ -58,6 +58,7 @@ import org.apache.mina.transport.socket.
  */
 public class UDPConduit extends AbstractConduit {
     private static final String CXF_MESSAGE_ATTR = "CXFMessage";
+    private static final String MULTI_RESPONSE_TIMEOUT = "udp.multi.response.timeout";
     private static final String HOST_PORT = UDPConduit.class + ".host:port";
     private static final Logger LOG = LogUtils.getL7dLogger(UDPDestination.class); 
 
@@ -103,8 +104,10 @@ public class UDPConduit extends Abstract
                 });
             } else {
                 incomingObserver.onMessage(inMessage);
+                if (!message.getExchange().isSynchronous()) {
+                    message.getExchange().setInMessage(null);
+                }
             }
-            
         } else {
             IoSessionInputStream ins = message.getExchange().getInMessage().get(IoSessionInputStream.class);
             ins.setBuffer((IoBuffer)buf);
@@ -226,7 +229,7 @@ public class UDPConduit extends Abstract
             socket.setReceiveBufferSize(64 * 1024);
             socket.setBroadcast(true);
             
-            if (multicast != null) {
+            if (multicast == null) {
                 Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
                 while (interfaces.hasMoreElements()) {
                     NetworkInterface networkInterface = interfaces.nextElement();
@@ -267,9 +270,26 @@ public class UDPConduit extends Abstract
             if (!message.getExchange().isOneWay()) {
                 byte bytes[] = new byte[64 * 1024];
                 DatagramPacket p = new DatagramPacket(bytes, bytes.length);
-                socket.setSoTimeout(30000);
-                socket.receive(p);
-                dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()), false);
+                Integer i = (Integer)message.getContextualProperty(MULTI_RESPONSE_TIMEOUT);
+                if (i == null) {
+                    socket.setSoTimeout(30000);
+                    socket.receive(p);
+                    dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()), false);
+                } else {
+                    socket.setSoTimeout(i);
+                    boolean found = false;
+                    try {
+                        while (true) {
+                            socket.receive(p);
+                            dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()), false);
+                            found = true;
+                        }
+                    } catch (java.net.SocketTimeoutException ex) {
+                        if (!found) {
+                            throw ex;
+                        }
+                    }
+                }
             }
             socket.close();
         }

Modified: cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java?rev=1374362&r1=1374361&r2=1374362&view=diff
==============================================================================
--- cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java (original)
+++ cxf/trunk/rt/ws/addr/src/main/java/org/apache/cxf/ws/addressing/soap/MAPCodec.java Fri Aug 17 17:05:17 2012
@@ -803,8 +803,14 @@ public class MAPCodec extends AbstractSo
                 } else if (!MessageUtils.getContextualBoolean(message,
                                           "org.apache.cxf.ws.addressing.MAPAggregator.addressingDisabled",
                                           false)) {
-                    LOG.log(Level.WARNING, "CORRELATION_FAILURE_MSG");
-                    message.getInterceptorChain().abort();
+                    //see if it can directly be correlated with the out message:
+                    AddressingProperties outp = ContextUtils.retrieveMAPs(message.getExchange().getOutMessage(),
+                                                                          false, true, false);
+                    if (outp == null 
+                        || !outp.getMessageID().getValue().equals(maps.getRelatesTo().getValue())) {
+                        LOG.log(Level.WARNING, "CORRELATION_FAILURE_MSG");
+                        message.getInterceptorChain().abort();
+                    }
                 }
             }
         } else if (maps == null && isRequestor(message)) {