You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by an...@apache.org on 2015/01/27 18:19:04 UTC
tomee git commit: #TOMEE-1500 - MultiPulse bad URI now fires even if
ignored
Repository: tomee
Updated Branches:
refs/heads/tomee-1.7.x efbb99651 -> 2e2031eba
#TOMEE-1500 - MultiPulse bad URI now fires even if ignored
Project: http://git-wip-us.apache.org/repos/asf/tomee/repo
Commit: http://git-wip-us.apache.org/repos/asf/tomee/commit/2e2031eb
Tree: http://git-wip-us.apache.org/repos/asf/tomee/tree/2e2031eb
Diff: http://git-wip-us.apache.org/repos/asf/tomee/diff/2e2031eb
Branch: refs/heads/tomee-1.7.x
Commit: 2e2031eba781357b48f7f5cc6ff3e14a51353f79
Parents: efbb996
Author: andygumbrecht <an...@apache.org>
Authored: Tue Jan 27 18:18:33 2015 +0100
Committer: andygumbrecht <an...@apache.org>
Committed: Tue Jan 27 18:18:33 2015 +0100
----------------------------------------------------------------------
.../server/discovery/MulticastPulseAgent.java | 55 ++++++++++----------
.../discovery/MulticastPulseAgentTest.java | 33 +++++++-----
2 files changed, 47 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tomee/blob/2e2031eb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
----------------------------------------------------------------------
diff --git a/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java b/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
index 04dc79c..eb91edd 100644
--- a/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
+++ b/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
@@ -75,9 +75,8 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM
private final ReentrantLock lock = new ReentrantLock();
private final Set<String> ignore = Collections.synchronizedSet(new HashSet<String>());
private final Set<URI> uriSet = new HashSet<URI>();
- private AtomicBoolean running = new AtomicBoolean(false);
+ private final AtomicBoolean running = new AtomicBoolean(false);
final ArrayList<Future> futures = new ArrayList<Future>();
- final ArrayList<Future> senders = new ArrayList<Future>();
private MulticastSocket[] sockets = null;
private InetSocketAddress address = null;
@@ -89,7 +88,6 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM
private boolean loopbackOnly = true;
/**
- * @author Andy Gumbrecht
* This agent listens for client pulses on a defined multicast channel.
* On receipt of a valid pulse the agent responds with its own pulse for
* a defined amount of time and rate. A client can deliver a pulse as often as
@@ -198,7 +196,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM
if (bytes.length > 2048) {
log.warning("MultiPulse packet is larger than 2048 bytes, clients will not be able to read the packet" +
- "\n - You should define the 'ignore' property to filter out unreachable addresses: " + sb);
+ "\n - You should define the 'ignore' property to filter out unreachable addresses: " + sb);
}
} finally {
l.unlock();
@@ -263,13 +261,14 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM
private void fireEvent(final URI uri, final boolean add) {
if (null != this.listener) {
+ final DiscoveryListener dl = this.listener;
getExecutorService().execute(new Runnable() {
@Override
public void run() {
if (add) {
- MulticastPulseAgent.this.listener.serviceAdded(uri);
+ dl.serviceAdded(uri);
} else {
- MulticastPulseAgent.this.listener.serviceRemoved(uri);
+ dl.serviceRemoved(uri);
}
}
});
@@ -290,6 +289,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM
final String mpg = this.group;
final boolean isLoopBackOnly = this.loopbackOnly;
final ExecutorService executorService = getExecutorService();
+ final MulticastPulseAgent agent = MulticastPulseAgent.this;
for (final MulticastSocket socket : this.sockets) {
@@ -311,7 +311,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM
final DatagramPacket request = new DatagramPacket(new byte[2048], 2048);
latch.countDown();
- while (MulticastPulseAgent.this.running.get()) {
+ while (agent.running.get()) {
try {
socket.receive(request);
@@ -338,24 +338,25 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM
if (mpg.equals(req) || "*".equals(req)) {
//Is there a bad url and is it this agent broadcasting the bad URI?
- if (null != badUri && getHosts(MulticastPulseAgent.this.ignore).contains(badUri)) {
- final ReentrantLock l = MulticastPulseAgent.this.lock;
- l.lock();
-
- try {
- //Remove it and rebuild our broadcast packet
- if (MulticastPulseAgent.this.ignore.add(badUri)) {
- MulticastPulseAgent.this.buildPacket();
-
- MulticastPulseAgent.this.fireEvent(URI.create("OpenEJB" + BADURI + badUri), false);
-
- log.warning("This server has removed the unreachable host '" + badUri + "' from discovery, you should consider adding" +
- " this to the 'ignore' property in the multipulse.properties file");
+ if (null != badUri) {
+ if (getHosts(agent.ignore).contains(badUri)) {
+ final ReentrantLock l = agent.lock;
+ l.lock();
+
+ try {
+ //Remove it and rebuild our broadcast packet
+ if (agent.ignore.add(badUri)) {
+ agent.buildPacket();
+ log.warning("This server has removed the unreachable host '" + badUri + "' from discovery, you should consider adding" +
+ " this to the 'ignore' property in the multipulse.properties file");
+ }
+ } finally {
+ l.unlock();
}
-
- } finally {
- l.unlock();
}
+
+ agent.fireEvent(URI.create("OpenEJB" + BADURI + badUri), false);
+
} else {
//Normal client multicast pulse request
@@ -365,8 +366,8 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM
//We only have local services, so make sure the request is from a local source else ignore it
if (log.isDebugEnabled()) {
log.debug(String.format("Ignoring remote client %1$s pulse request for group: %2$s - No remote services available",
- client,
- req));
+ client,
+ req));
}
} else {
@@ -660,7 +661,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM
final InetAddress localhost = InetAddress.getLocalHost();
hosts.add(localhost.getHostAddress());
//Multi-homed
- final InetAddress[] all = InetAddress.getAllByName(localhost.getCanonicalHostName());
+ final InetAddress[] all = InetAddress.getAllByName(localhost.getHostName());
for (final InetAddress ip : all) {
if (ip.isLinkLocalAddress() || ip.isMulticastAddress()) {
@@ -670,7 +671,7 @@ public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfM
final String ha = ip.getHostAddress();
if (!ha.replace("[", "").startsWith("2001:0:")) { //Filter Teredo
hosts.add(ha);
- hosts.add(ip.getCanonicalHostName());
+ hosts.add(ip.getHostName());
}
}
} catch (final UnknownHostException e) {
http://git-wip-us.apache.org/repos/asf/tomee/blob/2e2031eb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
----------------------------------------------------------------------
diff --git a/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java b/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
index f1567f4..a9d3aa4 100644
--- a/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
+++ b/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
@@ -200,7 +200,7 @@ public class MulticastPulseAgentTest {
final SocketAddress sa = response.getSocketAddress();
- if (null != sa && (sa instanceof InetSocketAddress)) {
+ if ((sa instanceof InetSocketAddress)) {
int len = response.getLength();
if (len > 2048) {
@@ -226,9 +226,9 @@ public class MulticastPulseAgentTest {
final String[] hosts = s.split(",");
System.out.println(String.format("\n" + name + " received Server pulse:\n\tGroup: %1$s\n\tServices: %2$s\n\tServer: %3$s\n",
- group,
- services,
- s));
+ group,
+ services,
+ s));
for (final String svc : serviceList) {
@@ -417,6 +417,9 @@ public class MulticastPulseAgentTest {
final String[] hosts = agent.getHosts().split(",");
final String host = hosts[hosts.length - 1];
+ boolean removed = agent.removeFromIgnore(host);
+ org.junit.Assert.assertTrue("Host is already ignored", !removed);
+
final Future<?> future = executor.submit(new Runnable() {
@Override
public void run() {
@@ -428,13 +431,16 @@ public class MulticastPulseAgentTest {
final MulticastSocket[] multicastSockets = MulticastPulseAgent.getSockets(MulticastPulseAgentTest.host, port);
- for (final MulticastSocket socket : multicastSockets) {
+ for (int i = 0; i < 5; i++) {
+ for (final MulticastSocket socket : multicastSockets) {
- try {
- socket.send(request);
- } catch (final Exception e) {
- System.out.println("Failed to broadcast bad URI on: " + socket.getInterface().getHostAddress());
- e.printStackTrace();
+ try {
+ socket.send(request);
+ Thread.sleep(100);
+ } catch (final Exception e) {
+ System.out.println("Failed to broadcast bad URI on: " + socket.getInterface().getHostAddress());
+ e.printStackTrace();
+ }
}
}
} catch (final Exception e) {
@@ -444,14 +450,13 @@ public class MulticastPulseAgentTest {
}
});
- final Object o = future.get(10, TimeUnit.SECONDS);
-
final boolean await = latch.await(20, TimeUnit.SECONDS);
- final boolean removed = agent.removeFromIgnore(host);
+ removed = agent.removeFromIgnore(host);
agent.setDiscoveryListener(original);
- org.junit.Assert.assertTrue("Failed to remove host", removed && await);
+ org.junit.Assert.assertTrue("Failed to remove host", removed);
+ org.junit.Assert.assertTrue("Failed to unlatch", await);
}
private String ipFormat(final String h) throws UnknownHostException {