You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by an...@apache.org on 2013/10/09 14:35:05 UTC
svn commit: r1530583 - in /tomee/tomee/trunk/server:
openejb-client/src/main/java/org/apache/openejb/client/
openejb-multicast/src/main/java/org/apache/openejb/server/discovery/
openejb-multicast/src/test/java/org/apache/openejb/server/discovery/
Author: andygumbrecht
Date: Wed Oct 9 12:35:05 2013
New Revision: 1530583
URL: http://svn.apache.org/r1530583
Log:
Fix https://issues.apache.org/jira/browse/OPENEJB-2043
Modified:
tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1530583&r1=1530582&r2=1530583&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java (original)
+++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java Wed Oct 9 12:35:05 2013
@@ -35,6 +35,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -66,8 +67,24 @@ public class MulticastPulseClient extend
private static final int TTL = Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_TTL, "32"));
private static final int LIMIT = Integer.parseInt(System.getProperty(ORG_APACHE_OPENEJB_MULTIPULSE_URI_LIMIT, "50000"));
private static final Map<URI, Set<URI>> knownUris = new HashMap<URI, Set<URI>>();
- private static final NetworkInterface[] interfaces = getNetworkInterfaces();
- private static final ExecutorService executor = Executors.newFixedThreadPool(interfaces.length + 1);
+ private static NetworkInterface[] interfaces = getNetworkInterfaces();
+ private static ExecutorService executor = null;
+
+ private static synchronized NetworkInterface[] getInterfaces() {
+ if (null == interfaces) {
+ interfaces = getNetworkInterfaces();
+ }
+
+ return interfaces;
+ }
+
+ private static synchronized ExecutorService getExecutorService() {
+ if (null == executor) {
+ executor = Executors.newFixedThreadPool(getInterfaces().length + 2);
+ }
+
+ return executor;
+ }
/**
* @param uri Connection URI
@@ -117,7 +134,7 @@ public class MulticastPulseClient extend
try {
//Strip serverhost and group and try to connect
return ConnectionManager.getConnection(URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart()));
- } catch (Throwable e) {
+ } catch (Exception e) {
uriSet.remove(serviceURI);
}
}
@@ -200,7 +217,7 @@ public class MulticastPulseClient extend
//Compare URI hosts
int i = compare(u1.getHost(), u2.getHost());
- if (i == 0) {
+ if (i != 0) {
i = uri1.compareTo(uri2);
}
@@ -223,7 +240,7 @@ public class MulticastPulseClient extend
} else if (0 != h1.compareTo(h2)) {
return -1;
}
- } catch (Throwable e) {
+ } catch (Exception e) {
//Ignore
}
@@ -240,7 +257,7 @@ public class MulticastPulseClient extend
for (final MulticastSocket socket : clientSocketsFinal) {
- futures.add(executor.submit(new Runnable() {
+ futures.add(getExecutorService().submit(new Runnable() {
@Override
public void run() {
try {
@@ -258,6 +275,10 @@ public class MulticastPulseClient extend
int len = response.getLength();
if (len > 2048) {
+
+ if (log.isLoggable(Level.FINE)) {
+ log.log(Level.FINE, "Truncating multipulse length {0} to 2048", new Object[]{len});
+ }
len = 2048;
}
@@ -288,7 +309,7 @@ public class MulticastPulseClient extend
final URI serviceUri;
try {
serviceUri = URI.create(svc);
- } catch (Throwable e) {
+ } catch (Exception e) {
continue;
}
@@ -327,7 +348,7 @@ public class MulticastPulseClient extend
//Just add as is
set.add(URI.create(svcfull));
}
- } catch (Throwable e) {
+ } catch (Exception e) {
//Ignore
} finally {
setLock.unlock();
@@ -337,19 +358,19 @@ public class MulticastPulseClient extend
}
}
- } catch (Throwable e) {
+ } catch (Exception e) {
//Ignore
}
}
} finally {
try {
socket.leaveGroup(ia);
- } catch (Throwable e) {
+ } catch (Exception e) {
//Ignore
}
try {
socket.close();
- } catch (Throwable e) {
+ } catch (Exception e) {
//Ignore
}
}
@@ -361,8 +382,9 @@ public class MulticastPulseClient extend
//Give listener threads a reasonable amount of time to start
if (latchListeners.await(5, TimeUnit.SECONDS)) {
- //Start pulsing request every 20ms - This will ensure we have at least 2 pulses within our minimum timeout
- futures.add(0, executor.submit(new Runnable() {
+ //Start pulsing client request every 10ms - This will ensure we have at least 4 client pulses within our minimum timeout
+ //This pulse is designed to tell a listening server to wake up and pulse back a response
+ futures.add(0, getExecutorService().submit(new Runnable() {
@Override
public void run() {
while (running.get()) {
@@ -372,7 +394,7 @@ public class MulticastPulseClient extend
if (running.get()) {
try {
socket.send(request);
- } catch (Throwable e) {
+ } catch (Exception e) {
//Ignore
}
} else {
@@ -382,7 +404,7 @@ public class MulticastPulseClient extend
if (running.get()) {
try {
- Thread.sleep(20);
+ Thread.sleep(10);
} catch (InterruptedException e) {
break;
}
@@ -414,12 +436,12 @@ public class MulticastPulseClient extend
try {
socket.leaveGroup(ia);
- } catch (Throwable e) {
+ } catch (Exception e) {
//Ignore
}
try {
socket.close();
- } catch (Throwable e) {
+ } catch (Exception e) {
//Ignore
}
}
@@ -430,7 +452,7 @@ public class MulticastPulseClient extend
for (final Future future : futures) {
try {
future.get();
- } catch (Throwable e) {
+ } catch (Exception e) {
//Ignore
}
}
@@ -446,12 +468,12 @@ public class MulticastPulseClient extend
try {
socket.leaveGroup(ia);
- } catch (Throwable e) {
+ } catch (Exception e) {
//Ignore
}
try {
socket.close();
- } catch (Throwable e) {
+ } catch (Exception e) {
//Ignore
}
}
@@ -501,7 +523,7 @@ public class MulticastPulseClient extend
final ArrayList<MulticastSocket> list = new ArrayList<MulticastSocket>();
- for (final NetworkInterface ni : interfaces) {
+ for (final NetworkInterface ni : getInterfaces()) {
MulticastSocket ms = null;
@@ -518,12 +540,12 @@ public class MulticastPulseClient extend
list.add(ms);
- } catch (Throwable e) {
+ } catch (Exception e) {
if (null != ms) {
try {
ms.close();
- } catch (Throwable t) {
+ } catch (Exception t) {
//Ignore
}
}
@@ -614,11 +636,14 @@ public class MulticastPulseClient extend
Set<URI> uriSet = null;
try {
uriSet = MulticastPulseClient.discoverURIs(discover, new HashSet<String>(Arrays.asList("ejbd", "ejbds", "http", "https")), mchost, mcport, timeout);
- } catch (Throwable e) {
+ } catch (Exception e) {
System.err.println(e.getMessage());
}
- if (uriSet != null && uriSet.size() > 0) {
+ final int size = uriSet.size();
+ if (uriSet != null && size > 0) {
+
+ final int st = (timeout / size);
for (final URI uri : uriSet) {
@@ -636,22 +661,24 @@ public class MulticastPulseClient extend
continue;
}
+ System.out.print(server + ":" + group + " - " + uriSub.toASCIIString() + " is reachable: ");
+
boolean b = false;
final Socket s = new Socket();
try {
- s.connect(new InetSocketAddress(host, port), 500);
+ s.connect(new InetSocketAddress(host, port), st);
b = true;
- } catch (Throwable e) {
+ } catch (Exception e) {
//Ignore
} finally {
try {
s.close();
- } catch (Throwable e) {
+ } catch (Exception e) {
//Ignore
}
}
- System.out.println(server + ":" + group + " - " + uriSub.toASCIIString() + " is reachable: " + b);
+ System.out.println(b);
}
} else {
System.out.println("### Failed to discover server: " + discover);
Modified: tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1530583&r1=1530582&r2=1530583&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java (original)
+++ tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java Wed Oct 9 12:35:05 2013
@@ -59,7 +59,7 @@ public class MulticastPulseAgent impleme
private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery").createChild("multipulse"), MulticastPulseAgent.class);
private static final NetworkInterface[] interfaces = getNetworkInterfaces();
- private static final ExecutorService executor = Executors.newFixedThreadPool(interfaces.length + 1);
+ private static final ExecutorService executor = Executors.newFixedThreadPool((interfaces.length + 2) * 2);
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final int TTL = Integer.parseInt(System.getProperty("org.apache.openejb.multipulse.ttl", "32"));
@@ -82,9 +82,9 @@ public class MulticastPulseAgent impleme
private boolean loopbackOnly = true;
/**
- * This agent listens for a client pulse on a defined multicast channel.
+ * This agent listens for client pulses on a defined multicast channel.
* On receipt of a valid pulse the agent responds with its own pulse for
- * a defined amount of time. A client can deliver a pulse as often as
+ * a defined amount of time and rate. A client can deliver a pulse as often as
* required until it is happy of the server response.
* <p/>
* Both server and client deliver crafted information payloads.
@@ -112,7 +112,7 @@ public class MulticastPulseAgent impleme
log.warning("Invalid ignore parameter. Should be a lowercase single or comma seperated list like: ignore=host1,host2");
}
- this.multicast = p.getProperty("bind", this.multicast);
+ this.multicast = o.get("bind", this.multicast);
this.port = o.get("port", this.port);
this.group = o.get("group", this.group);
@@ -225,6 +225,9 @@ public class MulticastPulseAgent impleme
}
final CountDownLatch latch = new CountDownLatch(this.sockets.length);
+ final String mpg = MulticastPulseAgent.this.group;
+ final boolean isLoopBackOnly = MulticastPulseAgent.this.loopbackOnly;
+ final DatagramPacket mpr = MulticastPulseAgent.this.response;
for (final MulticastSocket socket : this.sockets) {
@@ -243,34 +246,67 @@ public class MulticastPulseAgent impleme
if (null != sa) {
- String s = new String(request.getData(), 0, request.getLength());
+ final String req = new String(request.getData(), 0, request.getLength());
- if (s.startsWith(CLIENT)) {
-
- s = (s.replace(CLIENT, ""));
-
- final String client = ((InetSocketAddress) sa).getAddress().getHostAddress();
-
- if (MulticastPulseAgent.this.group.equals(s) || "*".equals(s)) {
-
- if (MulticastPulseAgent.this.loopbackOnly) {
- //We only have local services, so make sure the request is from a local source else ignore it
- if (!MulticastPulseAgent.isLocalAddress(client, false)) {
- log.debug(String.format("Ignoring remote client %1$s pulse request for group: %2$s - No remote services available", client, s));
- continue;
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+
+ String s = req;
+
+ if (s.startsWith(CLIENT)) {
+
+ s = (s.replace(CLIENT, ""));
+
+ if (mpg.equals(s) || "*".equals(s)) {
+
+ final String client = ((InetSocketAddress) sa).getAddress().getHostAddress();
+
+ if (isLoopBackOnly) {
+ //We only have local services, so make sure the request is from a local source else ignore it
+ if (!MulticastPulseAgent.isLocalAddress(client, false)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Ignoring remote client %1$s pulse request for group: %2$s - No remote services available",
+ client,
+ s));
+ }
+ return;
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Answering client %1$s pulse request for group: %2$s", client, s));
+ }
+
+ //This is a valid client request for the server to respond on the same channel.
+ //Because multicast is not guaranteed we will send 3 responses per valid request at 10ms intervals.
+ for (int i = 0; i < 3; i++) {
+
+ try {
+ socket.send(mpr);
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("MulticastPulseAgent client error: " + e.getMessage(), e);
+ }
+ }
+
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
}
}
-
- log.debug(String.format("Answering client %1$s pulse request for group: %2$s", client, s));
- socket.send(MulticastPulseAgent.this.response);
- } else {
- log.debug(String.format("Ignoring client %1$s pulse request for group: %2$s", client, s));
}
- }
+ });
+
}
- } catch (Throwable e) {
- //Ignore
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("MulticastPulseAgent request error: " + e.getMessage(), e);
+ }
}
}
Modified: tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java?rev=1530583&r1=1530582&r2=1530583&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java (original)
+++ tomee/tomee/trunk/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java Wed Oct 9 12:35:05 2013
@@ -141,7 +141,7 @@ public class MulticastPulseAgentTest {
//Compare URI hosts
int i = compare(u1.getHost(), u2.getHost());
- if (i == 0) {
+ if (i != 0) {
i = uri1.compareTo(uri2);
}