You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2013/10/23 20:18:03 UTC
svn commit: r1535093 - in /cxf/trunk:
rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
services/ws-discovery/ws-discovery-api/src/test/java/org/apache/cxf/ws/discovery/WSDiscoveryClientTest.java
Author: dkulp
Date: Wed Oct 23 18:18:02 2013
New Revision: 1535093
URL: http://svn.apache.org/r1535093
Log:
If something on the ws-discovery network throws a fault, ignore and continue getting the probematches responses
Modified:
cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
cxf/trunk/services/ws-discovery/ws-discovery-api/src/test/java/org/apache/cxf/ws/discovery/WSDiscoveryClientTest.java
Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1535093&r1=1535092&r2=1535093&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java (original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java Wed Oct 23 18:18:02 2013
@@ -30,7 +30,11 @@ import java.net.InterfaceAddress;
import java.net.MulticastSocket;
import java.net.NetworkInterface;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -76,41 +80,56 @@ public class UDPConduit extends Abstract
connector.setHandler(new IoHandlerAdapter() {
public void messageReceived(IoSession session, Object buf) {
Message message = (Message)session.getAttribute(CXF_MESSAGE_ATTR);
- dataReceived(message, (IoBuffer)buf, true);
+ dataReceived(message, (IoBuffer)buf, true, false);
}
});
}
- private void dataReceived(Message message, IoBuffer buf, boolean async) {
- if (message.getExchange().getInMessage() == null) {
- final Message inMessage = new MessageImpl();
- inMessage.setExchange(message.getExchange());
- message.getExchange().setInMessage(inMessage);
-
- IoSessionInputStream ins = new IoSessionInputStream(buf);
- inMessage.setContent(InputStream.class, ins);
- inMessage.put(IoSessionInputStream.class, ins);
-
- if (async) {
- WorkQueueManager queuem = bus.getExtension(WorkQueueManager.class);
- WorkQueue queue = queuem.getNamedWorkQueue("udp-conduit");
- if (queue == null) {
- queue = queuem.getAutomaticWorkQueue();
- }
- queue.execute(new Runnable() {
- public void run() {
- incomingObserver.onMessage(inMessage);
+ private void dataReceived(Message message, IoBuffer buf, boolean async, boolean multi) {
+ synchronized (message.getExchange()) {
+ if (message.getExchange().getInMessage() == null) {
+ final Message inMessage = new MessageImpl();
+ IoSessionInputStream ins = new IoSessionInputStream(buf);
+ inMessage.setContent(InputStream.class, ins);
+ inMessage.put(IoSessionInputStream.class, ins);
+
+ message.getExchange().setInMessage(inMessage);
+ inMessage.setExchange(message.getExchange());
+
+ Map<String, Object> mp = null;
+ if (multi) {
+ mp = new HashMap<String, Object>(message.getExchange());
+ }
+
+ if (async) {
+ WorkQueueManager queuem = bus.getExtension(WorkQueueManager.class);
+ WorkQueue queue = queuem.getNamedWorkQueue("udp-conduit");
+ if (queue == null) {
+ queue = queuem.getAutomaticWorkQueue();
+ }
+ queue.execute(new Runnable() {
+ public void run() {
+ incomingObserver.onMessage(inMessage);
+ }
+ });
+ } else {
+ incomingObserver.onMessage(inMessage);
+ if (!message.getExchange().isSynchronous() || multi) {
+ message.getExchange().setInMessage(null);
+ message.getExchange().setInFaultMessage(null);
}
- });
- } else {
- incomingObserver.onMessage(inMessage);
- if (!message.getExchange().isSynchronous()) {
- message.getExchange().setInMessage(null);
}
+ if (mp != null) {
+ Collection<String> s = new ArrayList<String>(message.getExchange().keySet());
+ for (String s2 : s) {
+ message.getExchange().remove(s2);
+ }
+ message.getExchange().putAll(mp);
+ }
+ } else {
+ IoSessionInputStream ins = message.getExchange().getInMessage().get(IoSessionInputStream.class);
+ ins.setBuffer((IoBuffer)buf);
}
- } else {
- IoSessionInputStream ins = message.getExchange().getInMessage().get(IoSessionInputStream.class);
- ins.setBuffer((IoBuffer)buf);
}
}
@@ -280,14 +299,14 @@ public class UDPConduit extends Abstract
if (i == null || i <= 0 || message.getExchange().isSynchronous()) {
socket.setSoTimeout(30000);
socket.receive(p);
- dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()), false);
+ dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()), false, false);
} else {
socket.setSoTimeout(i);
boolean found = false;
try {
while (true) {
socket.receive(p);
- dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()), false);
+ dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()), false, true);
found = true;
}
} catch (java.net.SocketTimeoutException ex) {
Modified: cxf/trunk/services/ws-discovery/ws-discovery-api/src/test/java/org/apache/cxf/ws/discovery/WSDiscoveryClientTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/ws-discovery/ws-discovery-api/src/test/java/org/apache/cxf/ws/discovery/WSDiscoveryClientTest.java?rev=1535093&r1=1535092&r2=1535093&view=diff
==============================================================================
--- cxf/trunk/services/ws-discovery/ws-discovery-api/src/test/java/org/apache/cxf/ws/discovery/WSDiscoveryClientTest.java (original)
+++ cxf/trunk/services/ws-discovery/ws-discovery-api/src/test/java/org/apache/cxf/ws/discovery/WSDiscoveryClientTest.java Wed Oct 23 18:18:02 2013
@@ -52,6 +52,15 @@ public final class WSDiscoveryClientTest
Endpoint ep = Endpoint.publish("http://localhost:51919/Foo/Snarf", new FooImpl());
WSDiscoveryServiceImpl service = new WSDiscoveryServiceImpl(bus);
service.startup();
+
+ //this service will just generate an error. However, the probes should still
+ //work to probe the above stuff.
+ WSDiscoveryServiceImpl s2 = new WSDiscoveryServiceImpl() {
+ public ProbeMatchesType handleProbe(ProbeType pt) {
+ throw new RuntimeException("Error!!!");
+ }
+ };
+ s2.startup();
HelloType h = service.register(ep.getEndpointReference());
@@ -66,7 +75,7 @@ public final class WSDiscoveryClientTest
ProbeType pt = new ProbeType();
ScopesType scopes = new ScopesType();
pt.setScopes(scopes);
- ProbeMatchesType pmts = c.probe(pt);
+ ProbeMatchesType pmts = c.probe(pt, 1000);
System.out.println("2");
if (pmts != null) {
for (ProbeMatchType pmt : pmts.getProbeMatch()) {
@@ -75,7 +84,13 @@ public final class WSDiscoveryClientTest
System.out.println(pmt.getXAddrs());
}
}
+ if (pmts.getProbeMatch().size() == 0) {
+ System.exit(0);
+ }
pmts = c.probe(pt);
+
+ System.out.println("Size:" + pmts.getProbeMatch().size());
+
System.out.println("3");
W3CEndpointReference ref = null;