You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/08/24 22:23:31 UTC
[1/3] activemq-artemis git commit: ARTEMIS-697 Avoid self-discovery
Repository: activemq-artemis
Updated Branches:
refs/heads/master 6e8832b6c -> dd0bd97b8
ARTEMIS-697 Avoid self-discovery
This avoids an issue where a broker would discover itself, causing an unexpected behavior when using
core bridges to forward messages:
* Make channel manager a singleton ensuring that only one channel with a given name exists
* Ensure that messages are marked with NON_LOOPBACK to avoid receiving messages originating from
itself
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/bf4796c5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/bf4796c5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/bf4796c5
Branch: refs/heads/master
Commit: bf4796c5d30d90b46b22764d638fc074c026b4b9
Parents: 6e8832b
Author: Ulf Lilleengen <lu...@redhat.com>
Authored: Thu Aug 18 15:46:09 2016 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 24 17:06:55 2016 -0400
----------------------------------------------------------------------
.../apache/activemq/artemis/api/core/jgroups/JChannelManager.java | 2 +-
.../apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java | 2 ++
2 files changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bf4796c5/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
index f594c07..1db4327 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
@@ -34,7 +34,7 @@ public class JChannelManager {
private static final Logger logger = Logger.getLogger(JChannelManager.class);
- private Map<String, JChannelWrapper> channels;
+ private static Map<String, JChannelWrapper> channels;
public synchronized JChannelWrapper getJChannel(String channelName,
JGroupsBroadcastEndpoint endpoint) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/bf4796c5/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
index 7d64dd4..eb61ffb 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.jboss.logging.Logger;
import org.jgroups.JChannel;
+import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
/**
@@ -127,6 +128,7 @@ public class JChannelWrapper {
public void send(org.jgroups.Message msg) throws Exception {
if (logger.isTraceEnabled()) logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen() + " on channel " + channelName + " msg=" + msg);
+ msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
channel.send(msg);
}
[3/3] activemq-artemis git commit: This closes #731
Posted by jb...@apache.org.
This closes #731
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dd0bd97b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dd0bd97b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dd0bd97b
Branch: refs/heads/master
Commit: dd0bd97b8f9c71e188911a31c18255ba07209add
Parents: 6e8832b 858d7a1
Author: jbertram <jb...@apache.com>
Authored: Wed Aug 24 17:23:11 2016 -0500
Committer: jbertram <jb...@apache.com>
Committed: Wed Aug 24 17:23:11 2016 -0500
----------------------------------------------------------------------
.../core/ChannelBroadcastEndpointFactory.java | 28 +---
.../JGroupsFileBroadcastEndpointFactory.java | 2 +-
...roupsPropertiesBroadcastEndpointFactory.java | 2 +-
.../api/core/jgroups/JChannelManager.java | 38 ++++-
.../api/core/jgroups/JChannelWrapper.java | 20 ++-
.../broadcast/JGroupsBroadcastTest.java | 14 ++
.../integration/discovery/DiscoveryTest.java | 156 +++++++++++--------
7 files changed, 152 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
[2/3] activemq-artemis git commit: ARTEMIS-697 Making JChannelManager
a singleton, and fixing tests
Posted by jb...@apache.org.
ARTEMIS-697 Making JChannelManager a singleton, and fixing tests
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/858d7a1a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/858d7a1a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/858d7a1a
Branch: refs/heads/master
Commit: 858d7a1a02ec18e253a9caba54e8d3cad9f04eed
Parents: bf4796c
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Aug 24 10:31:04 2016 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 24 18:08:17 2016 -0400
----------------------------------------------------------------------
.../core/ChannelBroadcastEndpointFactory.java | 28 +---
.../JGroupsFileBroadcastEndpointFactory.java | 2 +-
...roupsPropertiesBroadcastEndpointFactory.java | 2 +-
.../api/core/jgroups/JChannelManager.java | 38 ++++-
.../api/core/jgroups/JChannelWrapper.java | 20 ++-
.../broadcast/JGroupsBroadcastTest.java | 14 ++
.../integration/discovery/DiscoveryTest.java | 156 +++++++++++--------
7 files changed, 151 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
index af0df2e..66b61d3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ChannelBroadcastEndpointFactory.java
@@ -16,9 +16,6 @@
*/
package org.apache.activemq.artemis.api.core;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.jboss.logging.Logger;
import org.jgroups.JChannel;
@@ -38,32 +35,9 @@ public class ChannelBroadcastEndpointFactory implements BroadcastEndpointFactory
private final JChannelManager manager;
- private static final Map<JChannel, JChannelManager> managers = new ConcurrentHashMap<>();
+ private static final JChannelManager singletonManager = JChannelManager.getInstance();
- private static final JChannelManager singletonManager = new JChannelManager();
-// TODO: To implement this when JForkChannel from JGroups supports multiple channels properly
-//
-// private static JChannelManager recoverManager(JChannel channel) {
-// JChannelManager manager = managers.get(channel);
-// if (manager == null) {
-// if (logger.isTraceEnabled()) {
-// logger.trace("Creating a new JChannelManager for " + channel, new Exception("trace"));
-// }
-// manager = new JChannelManager();
-// managers.put(channel, manager);
-// }
-// else {
-// if (logger.isTraceEnabled()) {
-// logger.trace("Recover an already existent channelManager for " + channel, new Exception("trace"));
-// }
-//
-// }
-//
-// return manager;
-// }
-//
public ChannelBroadcastEndpointFactory(JChannel channel, String channelName) {
- // TODO: use recoverManager(channel)
this(singletonManager, channel, channelName);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
index 9f783e7..f560c71 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsFileBroadcastEndpointFactory.java
@@ -25,7 +25,7 @@ public class JGroupsFileBroadcastEndpointFactory implements BroadcastEndpointFac
private String channelName;
- private final JChannelManager manager = new JChannelManager();
+ private final JChannelManager manager = JChannelManager.getInstance();
@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
index 8ed03ab..05867d7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JGroupsPropertiesBroadcastEndpointFactory.java
@@ -24,7 +24,7 @@ public class JGroupsPropertiesBroadcastEndpointFactory implements BroadcastEndpo
private String channelName;
- private final JChannelManager manager = new JChannelManager();
+ private final JChannelManager manager = JChannelManager.getInstance();
@Override
public BroadcastEndpoint createBroadcastEndpoint() throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
index 1db4327..682bf76 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelManager.java
@@ -32,15 +32,43 @@ import org.jboss.logging.Logger;
*/
public class JChannelManager {
- private static final Logger logger = Logger.getLogger(JChannelManager.class);
+ private static final JChannelManager theInstance = new JChannelManager();
- private static Map<String, JChannelWrapper> channels;
+ public static JChannelManager getInstance() {
+ return theInstance;
+ }
+
+ private JChannelManager() {
+ }
+
+ public synchronized JChannelManager clear() {
+ for (JChannelWrapper wrapper : channels.values()) {
+ wrapper.closeChannel();
+ }
+ channels.clear();
+ setLoopbackMessages(false);
+ return this;
+ }
+
+ // if true, messages will be loopbacked
+ // this is useful for testcases using a single channel.
+ private boolean loopbackMessages = false;
+
+ private final Logger logger = Logger.getLogger(JChannelManager.class);
+
+ private static final Map<String, JChannelWrapper> channels = new HashMap<>();
+
+ public boolean isLoopbackMessages() {
+ return loopbackMessages;
+ }
+
+ public JChannelManager setLoopbackMessages(boolean loopbackMessages) {
+ this.loopbackMessages = loopbackMessages;
+ return this;
+ }
public synchronized JChannelWrapper getJChannel(String channelName,
JGroupsBroadcastEndpoint endpoint) throws Exception {
- if (channels == null) {
- channels = new HashMap<>();
- }
JChannelWrapper wrapper = channels.get(channelName);
if (wrapper == null) {
wrapper = new JChannelWrapper(this, channelName, endpoint.createChannel());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
index eb61ffb..e83a33d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/jgroups/JChannelWrapper.java
@@ -86,15 +86,21 @@ public class JChannelWrapper {
if (logger.isTraceEnabled()) logger.trace(this + "::RefCount-- " + refCount + " on channel " + channelName, new Exception("Trace"));
if (refCount == 0) {
if (closeWrappedChannel) {
- connected = false;
- channel.setReceiver(null);
- logger.trace(this + "::Closing Channel: " + channelName, new Exception("Trace"));
- channel.close();
- manager.removeChannel(channelName);
+ closeChannel();
}
+ manager.removeChannel(channelName);
}
}
+ public synchronized void closeChannel() {
+ connected = false;
+ channel.setReceiver(null);
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + "::Closing Channel: " + channelName, new Exception("Trace"));
+ }
+ channel.close();
+ }
+
public void removeReceiver(JGroupsReceiver receiver) {
if (logger.isTraceEnabled()) logger.trace(this + "::removeReceiver: " + receiver + " on " + channelName, new Exception("Trace"));
synchronized (receivers) {
@@ -128,7 +134,9 @@ public class JChannelWrapper {
public void send(org.jgroups.Message msg) throws Exception {
if (logger.isTraceEnabled()) logger.trace(this + "::Sending JGroups Message: Open=" + channel.isOpen() + " on channel " + channelName + " msg=" + msg);
- msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
+ if (!manager.isLoopbackMessages()) {
+ msg.setTransientFlag(Message.TransientFlag.DONT_LOOPBACK);
+ }
channel.send(msg);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
index 53a6783..5bf36e9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/broadcast/JGroupsBroadcastTest.java
@@ -19,15 +19,29 @@ package org.apache.activemq.artemis.tests.integration.broadcast;
import org.apache.activemq.artemis.api.core.BroadcastEndpoint;
import org.apache.activemq.artemis.api.core.BroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.ChannelBroadcastEndpointFactory;
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule;
import org.jgroups.JChannel;
import org.jgroups.conf.PlainConfigurator;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
public class JGroupsBroadcastTest {
+ @After
+ public void cleanupJChannel() {
+ JChannelManager.getInstance().clear();
+ }
+
+ @Before
+ public void prepareJChannel() {
+ JChannelManager.getInstance().setLoopbackMessages(true);
+ }
+
+
@Rule
public ThreadLeakCheckRule threadLeakCheckRule = new ThreadLeakCheckRule();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/858d7a1a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
index 0c667a8..a1faedc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryTest.java
@@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.JGroupsFileBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
+import org.apache.activemq.artemis.api.core.jgroups.JChannelManager;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.tests.integration.SimpleNotificationService;
import org.apache.activemq.artemis.core.cluster.DiscoveryEntry;
@@ -42,6 +43,7 @@ import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
/**
@@ -70,9 +72,15 @@ public class DiscoveryTest extends DiscoveryBaseTest {
BroadcastGroup bg = null, bg1 = null, bg2 = null, bg3 = null;
DiscoveryGroup dg = null, dg1 = null, dg2 = null, dg3 = null;
+ @Before
+ public void prepareLoopback() {
+ JChannelManager.getInstance().setLoopbackMessages(true);
+ }
+
@Override
@After
public void tearDown() throws Exception {
+ JChannelManager.getInstance().clear().setLoopbackMessages(false);
/** This file path is defined at {@link #TEST_JGROUPS_CONF_FILE} */
deleteDirectory(new File("./target/tmp/amqtest.ping.dir"));
for (ActiveMQComponent component : new ActiveMQComponent[]{bg, bg1, bg2, bg3, dg, dg1, dg2, dg3}) {
@@ -140,47 +148,52 @@ public class DiscoveryTest extends DiscoveryBaseTest {
BroadcastEndpoint broadcaster = factory.createBroadcastEndpoint();
broadcaster.openBroadcaster();
- int num = 100;
- BroadcastEndpoint[] receivers = new BroadcastEndpoint[num];
- for (int i = 0; i < num; i++) {
- receivers[i] = factory.createBroadcastEndpoint();
- receivers[i].openClient();
- }
+ try {
- final byte[] data = new byte[]{1, 2, 3, 4, 5};
- broadcaster.broadcast(data);
+ int num = 100;
+ BroadcastEndpoint[] receivers = new BroadcastEndpoint[num];
+ for (int i = 0; i < num; i++) {
+ receivers[i] = factory.createBroadcastEndpoint();
+ receivers[i].openClient();
+ }
- for (int i = 0; i < num; i++) {
- byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
- assertNotNull(received);
- assertEquals(5, received.length);
- assertEquals(1, received[0]);
- assertEquals(2, received[1]);
- assertEquals(3, received[2]);
- assertEquals(4, received[3]);
- assertEquals(5, received[4]);
- }
+ final byte[] data = new byte[]{1, 2, 3, 4, 5};
+ broadcaster.broadcast(data);
+
+ for (int i = 0; i < num; i++) {
+ byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
+ assertNotNull(received);
+ assertEquals(5, received.length);
+ assertEquals(1, received[0]);
+ assertEquals(2, received[1]);
+ assertEquals(3, received[2]);
+ assertEquals(4, received[3]);
+ assertEquals(5, received[4]);
+ }
- for (int i = 0; i < num - 1; i++) {
- receivers[i].close(false);
- }
+ for (int i = 0; i < num - 1; i++) {
+ receivers[i].close(false);
+ }
- byte[] data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS);
- assertNull(data1);
+ byte[] data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS);
+ assertNull(data1);
- broadcaster.broadcast(data);
- data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS);
+ broadcaster.broadcast(data);
+ data1 = receivers[num - 1].receiveBroadcast(5, TimeUnit.SECONDS);
- assertNotNull(data1);
- assertEquals(5, data1.length);
- assertEquals(1, data1[0]);
- assertEquals(2, data1[1]);
- assertEquals(3, data1[2]);
- assertEquals(4, data1[3]);
- assertEquals(5, data1[4]);
+ assertNotNull(data1);
+ assertEquals(5, data1.length);
+ assertEquals(1, data1[0]);
+ assertEquals(2, data1[1]);
+ assertEquals(3, data1[2]);
+ assertEquals(4, data1[3]);
+ assertEquals(5, data1[4]);
- receivers[num - 1].close(false);
- broadcaster.close(true);
+ receivers[num - 1].close(false);
+ }
+ finally {
+ broadcaster.close(true);
+ }
}
/**
@@ -195,7 +208,6 @@ public class DiscoveryTest extends DiscoveryBaseTest {
BroadcastEndpointFactory factory = new JGroupsFileBroadcastEndpointFactory().setChannelName("tst").setFile(TEST_JGROUPS_CONF_FILE);
BroadcastEndpoint broadcaster = factory.createBroadcastEndpoint();
broadcaster.openBroadcaster();
-
int num = 50;
BroadcastEndpoint[] receivers = new BroadcastEndpoint[num];
for (int i = 0; i < num; i++) {
@@ -203,47 +215,53 @@ public class DiscoveryTest extends DiscoveryBaseTest {
receivers[i].openClient();
}
- final byte[] data = new byte[]{1, 2, 3, 4, 5};
- broadcaster.broadcast(data);
- for (int i = 0; i < num; i++) {
- byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
- assertNotNull(received);
- assertEquals(5, received.length);
- assertEquals(1, received[0]);
- assertEquals(2, received[1]);
- assertEquals(3, received[2]);
- assertEquals(4, received[3]);
- assertEquals(5, received[4]);
- }
+ try {
- for (int i = 0; i < num; i++) {
- receivers[i].close(false);
- }
+ final byte[] data = new byte[]{1, 2, 3, 4, 5};
+ broadcaster.broadcast(data);
+
+ for (int i = 0; i < num; i++) {
+ byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
+ assertNotNull(received);
+ assertEquals(5, received.length);
+ assertEquals(1, received[0]);
+ assertEquals(2, received[1]);
+ assertEquals(3, received[2]);
+ assertEquals(4, received[3]);
+ assertEquals(5, received[4]);
+ }
- //new ones
- for (int i = 0; i < num; i++) {
- receivers[i] = factory.createBroadcastEndpoint();
- receivers[i].openClient();
- }
+ for (int i = 0; i < num; i++) {
+ receivers[i].close(false);
+ }
- broadcaster.broadcast(data);
+ //new ones
+ for (int i = 0; i < num; i++) {
+ receivers[i] = factory.createBroadcastEndpoint();
+ receivers[i].openClient();
+ }
- for (int i = 0; i < num; i++) {
- byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
- assertNotNull(received);
- assertEquals(5, received.length);
- assertEquals(1, received[0]);
- assertEquals(2, received[1]);
- assertEquals(3, received[2]);
- assertEquals(4, received[3]);
- assertEquals(5, received[4]);
- }
+ broadcaster.broadcast(data);
+
+ for (int i = 0; i < num; i++) {
+ byte[] received = receivers[i].receiveBroadcast(5000, TimeUnit.MILLISECONDS);
+ assertNotNull(received);
+ assertEquals(5, received.length);
+ assertEquals(1, received[0]);
+ assertEquals(2, received[1]);
+ assertEquals(3, received[2]);
+ assertEquals(4, received[3]);
+ assertEquals(5, received[4]);
+ }
- for (int i = 0; i < num; i++) {
- receivers[i].close(false);
}
- broadcaster.close(true);
+ finally {
+ for (int i = 0; i < num; i++) {
+ receivers[i].close(false);
+ }
+ broadcaster.close(true);
+ }
}
/**