You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by an...@apache.org on 2013/11/19 15:36:26 UTC
svn commit: r1543440 - in /tomee/tomee/trunk/server:
openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
Author: andygumbrecht
Date: Tue Nov 19 14:36:26 2013
New Revision: 1543440
URL: http://svn.apache.org/r1543440
Log:
Improve multipulse discovery - Server agent now pulses response more efficiently.
Modified:
tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
Modified: tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1543440&r1=1543439&r2=1543440&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java (original)
+++ tomee/tomee/trunk/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java Tue Nov 19 14:36:26 2013
@@ -18,7 +18,9 @@ import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.ConcurrentModificationException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
@@ -79,8 +81,15 @@ public class MulticastPulseClient extend
}
private static synchronized ExecutorService getExecutorService() {
+
if (null == executor) {
- executor = Executors.newFixedThreadPool(getInterfaces().length + 2);
+
+ int length = getInterfaces().length;
+ if (length < 1) {
+ length = 1;
+ }
+
+ executor = Executors.newFixedThreadPool(length * 2);
}
return executor;
@@ -194,6 +203,7 @@ public class MulticastPulseClient extend
final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia, port));
final AtomicBoolean running = new AtomicBoolean(true);
+ final List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
MulticastSocket[] clientSockets = null;
@@ -252,7 +262,6 @@ public class MulticastPulseClient extend
//Start threads that listen for multicast packets on our channel.
//These need to start 'before' we pulse a request.
- final ArrayList<Future> futures = new ArrayList<Future>();
final CountDownLatch latchListeners = new CountDownLatch(clientSocketsFinal.length);
for (final MulticastSocket socket : clientSocketsFinal) {
@@ -380,7 +389,7 @@ public class MulticastPulseClient extend
try {
//Give listener threads a reasonable amount of time to start
- if (latchListeners.await(5, TimeUnit.SECONDS)) {
+ if (latchListeners.await(clientSocketsFinal.length * 2, TimeUnit.SECONDS)) {
//Start pulsing client request every 10ms - This will ensure we have at least 4 client pulses within our minimum timeout
//This pulse is designed to tell a listening server to wake up and pulse back a response
@@ -428,23 +437,14 @@ public class MulticastPulseClient extend
running.set(false);
- for (final Future future : futures) {
- future.cancel(true);
- }
-
- for (final MulticastSocket socket : clientSocketsFinal) {
-
- try {
- socket.leaveGroup(ia);
- } catch (Exception e) {
- //Ignore
- }
- try {
- socket.close();
- } catch (Exception e) {
- //Ignore
+ try {
+ for (final Future future : futures) {
+ future.cancel(true);
}
+ } catch (ConcurrentModificationException e) {
+ //Ignore
}
+
}
}, timeout);
@@ -464,6 +464,18 @@ public class MulticastPulseClient extend
setLock.unlock();
}
} finally {
+
+ //Just to be sure we are clean
+ for (final Future future : futures) {
+ try {
+ future.cancel(true);
+ } catch (Exception e) {
+ //Ignore
+ }
+ }
+
+ futures.clear();
+
for (final MulticastSocket socket : clientSockets) {
try {
Modified: tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
URL: http://svn.apache.org/viewvc/tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1543440&r1=1543439&r2=1543440&view=diff
==============================================================================
--- tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java (original)
+++ tomee/tomee/trunk/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java Tue Nov 19 14:36:26 2013
@@ -38,6 +38,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -58,8 +59,8 @@ import java.util.concurrent.atomic.Atomi
public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfManaging {
private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery").createChild("multipulse"), MulticastPulseAgent.class);
- private static final NetworkInterface[] interfaces = getNetworkInterfaces();
- private static final ExecutorService executor = Executors.newFixedThreadPool((interfaces.length + 2) * 2);
+ private static NetworkInterface[] interfaces = null;
+ private static ExecutorService executor = null;
private static final Charset UTF8 = Charset.forName("UTF-8");
private static final int TTL = Integer.parseInt(System.getProperty("org.apache.openejb.multipulse.ttl", "32"));
@@ -71,6 +72,7 @@ public class MulticastPulseAgent impleme
private final Set<URI> uriSet = new HashSet<URI>();
private AtomicBoolean running = new AtomicBoolean(false);
final ArrayList<Future> futures = new ArrayList<Future>();
+ final ArrayList<Future> senders = new ArrayList<Future>();
private MulticastSocket[] sockets = null;
private InetSocketAddress address = null;
@@ -92,6 +94,29 @@ public class MulticastPulseAgent impleme
public MulticastPulseAgent() {
}
+ private static synchronized NetworkInterface[] getInterfaces() {
+ if (null == interfaces) {
+ interfaces = getNetworkInterfaces();
+ }
+
+ return interfaces;
+ }
+
+ private static synchronized ExecutorService getExecutorService() {
+
+ if (null == executor) {
+
+ int length = getNetworkInterfaces().length;
+ if (length < 1) {
+ length = 1;
+ }
+
+ executor = Executors.newFixedThreadPool(length * 2);
+ }
+
+ return executor;
+ }
+
@Override
public void init(final Properties p) throws Exception {
final Options o = new Options(p);
@@ -124,7 +149,7 @@ public class MulticastPulseAgent impleme
private void buildPacket() throws SocketException {
this.loopbackOnly = true;
- for (final URI uri : uriSet) {
+ for (final URI uri : this.uriSet) {
if (!isLoopback(uri.getHost())) {
this.loopbackOnly = false;
break;
@@ -151,10 +176,13 @@ public class MulticastPulseAgent impleme
final byte[] bytes = (sb.toString()).getBytes(UTF8);
this.response = new DatagramPacket(bytes, bytes.length, this.address);
- log.debug("MultiPulse packet is: " + sb);
+ if (log.isDebugEnabled()) {
+ log.debug("MultiPulse packet is: " + sb);
+ }
if (bytes.length > 2048) {
- log.warning("MultiPulse packet is larger than 2048 bytes, clients will not be able to read the packet");
+ log.warning("MultiPulse packet is larger than 2048 bytes, clients will not be able to read the packet" +
+ "\n - You should define the 'ignore' property to filter out unreachable addresses: " + sb);
}
}
@@ -201,7 +229,7 @@ public class MulticastPulseAgent impleme
private void fireEvent(final URI uri, final boolean add) {
if (null != this.listener) {
- executor.execute(new Runnable() {
+ getExecutorService().execute(new Runnable() {
@Override
public void run() {
if (add) {
@@ -225,13 +253,24 @@ public class MulticastPulseAgent impleme
}
final CountDownLatch latch = new CountDownLatch(this.sockets.length);
- final String mpg = MulticastPulseAgent.this.group;
- final boolean isLoopBackOnly = MulticastPulseAgent.this.loopbackOnly;
- final DatagramPacket mpr = MulticastPulseAgent.this.response;
+ final String mpg = this.group;
+ final boolean isLoopBackOnly = this.loopbackOnly;
+ final ExecutorService executorService = getExecutorService();
for (final MulticastSocket socket : this.sockets) {
- this.futures.add(executor.submit(new Runnable() {
+ final String socketKey;
+ try {
+ socketKey = socket.getNetworkInterface().toString();
+ } catch (SocketException e) {
+ log.error("Failed to get network interface name on: " + socket, e);
+ continue;
+ }
+
+ final Sender sender = new Sender(this, socketKey, socket, this.response);
+ this.futures.add(executorService.submit(sender));
+
+ this.futures.add(executorService.submit(new Runnable() {
@Override
public void run() {
@@ -246,60 +285,37 @@ public class MulticastPulseAgent impleme
if (null != sa) {
- final String req = new String(request.getData(), 0, request.getLength());
+ String req = new String(request.getData(), 0, request.getLength());
- executor.execute(new Runnable() {
- @Override
- public void run() {
+ if (req.startsWith(CLIENT)) {
- String s = req;
+ req = (req.replace(CLIENT, ""));
- if (s.startsWith(CLIENT)) {
+ if (mpg.equals(req) || "*".equals(req)) {
- s = (s.replace(CLIENT, ""));
-
- if (mpg.equals(s) || "*".equals(s)) {
-
- final String client = ((InetSocketAddress) sa).getAddress().getHostAddress();
-
- if (isLoopBackOnly) {
- //We only have local services, so make sure the request is from a local source else ignore it
- if (!MulticastPulseAgent.isLocalAddress(client, false)) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Ignoring remote client %1$s pulse request for group: %2$s - No remote services available",
- client,
- s));
- }
- return;
- }
- }
+ final String client = ((InetSocketAddress) sa).getAddress().getHostAddress();
+ if (isLoopBackOnly) {
+ //We only have local services, so make sure the request is from a local source else ignore it
+ if (!MulticastPulseAgent.isLocalAddress(client, false)) {
if (log.isDebugEnabled()) {
- log.debug(String.format("Answering client %1$s pulse request for group: %2$s", client, s));
- }
-
- //This is a valid client request for the server to respond on the same channel.
- //Because multicast is not guaranteed we will send 3 responses per valid request at 10ms intervals.
- for (int i = 0; i < 3; i++) {
-
- try {
- socket.send(mpr);
- } catch (Exception e) {
- if (log.isDebugEnabled()) {
- log.debug("MulticastPulseAgent client error: " + e.getMessage(), e);
- }
- }
-
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- break;
- }
+ log.debug(String.format("Ignoring remote client %1$s pulse request for group: %2$s - No remote services available",
+ client,
+ req));
}
+ return;
}
}
+
+ //We have received a valid pulse request
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Answering client '%1$s' pulse request for group: '%2$s' on '%3$s'", client, req, socketKey));
+ }
+
+ //Renew response pulse
+ sender.pulseResponse();
}
- });
+ }
}
@@ -356,7 +372,7 @@ public class MulticastPulseAgent impleme
if (null != this.sockets) {
try {
- for (final MulticastSocket s : sockets) {
+ for (final MulticastSocket s : this.sockets) {
try {
s.close();
} catch (Throwable e) {
@@ -425,7 +441,7 @@ public class MulticastPulseAgent impleme
final ArrayList<MulticastSocket> list = new ArrayList<MulticastSocket>();
- for (final NetworkInterface ni : interfaces) {
+ for (final NetworkInterface ni : getNetworkInterfaces()) {
MulticastSocket ms = null;
@@ -453,7 +469,6 @@ public class MulticastPulseAgent impleme
//Ignore
}
}
-
}
}
@@ -590,4 +605,72 @@ public class MulticastPulseAgent impleme
return sb.toString();
}
+
+ private static class Sender implements Runnable {
+
+ private final AtomicInteger counter = new AtomicInteger(0);
+ private final MulticastPulseAgent agent;
+ private final String socketKey;
+ private final MulticastSocket socket;
+ private final DatagramPacket mpr;
+
+ private Sender(final MulticastPulseAgent agent, final String socketKey, final MulticastSocket socket, final DatagramPacket mpr) {
+ this.agent = agent;
+ this.socketKey = socketKey;
+ this.socket = socket;
+ this.mpr = mpr;
+ }
+
+ @Override
+ public void run() {
+ while (this.agent.running.get()) {
+
+ synchronized (this.counter) {
+ try {
+ //Wait indefinitely until we are interrupted or notified
+ this.counter.wait();
+ } catch (InterruptedException e) {
+ if (!this.agent.running.get()) {
+ break;
+ }
+ }
+ }
+
+ //Pulse a response every 10ms until our counter is 0 (at least 1 second)
+ while (this.counter.decrementAndGet() > 0) {
+
+ try {
+ this.socket.send(this.mpr);
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
+ log.debug("MulticastPulseAgent client error: " + e.getMessage(), e);
+ }
+ }
+
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Renew the counter and notify to pulse response
+ */
+ private void pulseResponse() {
+
+ synchronized (this.counter) {
+
+ this.counter.set(100);
+ this.counter.notifyAll();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return this.socketKey;
+ }
+ }
}