You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Giovanni Toffetti (Commented) (JIRA)" <ji...@apache.org> on 2011/11/09 13:02:51 UTC

[jira] [Commented] (AMQ-3582) CLONE - Duplicate topic messages received with network of brokers and selectors

    [ https://issues.apache.org/jira/browse/AMQ-3582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13146966#comment-13146966 ] 

Giovanni Toffetti commented on AMQ-3582:
----------------------------------------

I cloned the original bug as I don't don't think the problem is fixed: as soon as there are more than one ( at least 2 hops ) paths between brokers message duplication occurs.

Here's a little example:

{code:title=FourBrokerTopicNetworkTest}

public class FourBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport implements MessageListener {
	protected static final int MESSAGE_COUNT = 5;
	public boolean dynamicOnly;

	public void initCombosForTestABandBCbrokerNetworkWithSelectors() {
		addCombinationValues("dynamicOnly", new Object[] { true, false });
	}

	/**
	 * A simple square topology BrokerA <-> BrokerB BrokerA <-> BrokerC BrokerB
	 * <-> BrokerD BrokerD <-> BrokerC
	 * 
	 */
	public void testSquareConnectedBrokerNetwork2() throws Exception {
		int networkTTL = 2;
		boolean conduitSubs = true;
		boolean dynamicOnly = false;

		// Setup broker networks
		bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL,
				conduitSubs);

		bridgeBrokers("BrokerD", "BrokerC", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerC", "BrokerD", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerD", "BrokerB", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerB", "BrokerD", dynamicOnly, networkTTL,
				conduitSubs);

		startAllBrokers();

		// Setup destination
		Destination dest = createDestination("TEST.FOO", true);

		// Setup consumers
		MessageConsumer clientA = createConsumer("BrokerA", dest, "msgId > 0");
		MessageConsumer clientB = createConsumer("BrokerB", dest, "msgId > 0");
		MessageConsumer clientC = createConsumer("BrokerC", dest, "msgId > 0");
		MessageConsumer clientD = createConsumer("BrokerD", dest, "msgId > 0");
		// let consumers propogate around the network
		Thread.sleep(5000);
		
		clientD.setMessageListener(this);

		// Send messages
		String[] brokers = { "BrokerA", "BrokerB", "BrokerC", "BrokerD" };
		HashMap<String, Object> props = new HashMap<String, Object>();
		for (String broker : brokers) {
			props.put("sender", broker);
			for (int i = 1; i <= MESSAGE_COUNT; i++) {
				props.put("msgId", i);
				sendMessages(broker, dest, 1, props);
			}
		}

		// Get message count
		MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
		MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
		MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
		MessageIdList msgsD = getConsumerMessages("BrokerD", clientD);

		msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 4);
		msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 4);
		msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 4);
		msgsD.waitForMessagesToArrive(MESSAGE_COUNT * 4);
		
		System.out.println(msgsA.toString());

		assertEquals(MESSAGE_COUNT * 4, msgsA.getMessageCount());
		assertEquals(MESSAGE_COUNT * 4, msgsB.getMessageCount());
		assertEquals(MESSAGE_COUNT * 4, msgsC.getMessageCount());
		assertEquals(MESSAGE_COUNT * 4, msgsD.getMessageCount());
	}

	/**
	 * A simple square topology BrokerA <-> BrokerB BrokerA <-> BrokerC BrokerB
	 * <-> BrokerD BrokerD <-> BrokerC
	 * 
	 */
	public void testSquareConnectedBrokerNetwork() throws Exception {
		int networkTTL = 2;
		boolean conduitSubs = true;
		boolean dynamicOnly = false;

		// Setup broker networks
		bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL,
				conduitSubs);

		bridgeBrokers("BrokerD", "BrokerC", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerC", "BrokerD", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerD", "BrokerB", dynamicOnly, networkTTL,
				conduitSubs);
		bridgeBrokers("BrokerB", "BrokerD", dynamicOnly, networkTTL,
				conduitSubs);

		startAllBrokers();

		// Setup destination
		Destination dest = createDestination("TEST.FOO", true);

		// Setup consumers
		MessageConsumer clientA = createConsumer("BrokerA", dest);
		MessageConsumer clientB = createConsumer("BrokerB", dest);
		MessageConsumer clientC = createConsumer("BrokerC", dest);
		MessageConsumer clientD = createConsumer("BrokerD", dest);
		// let consumers propogate around the network
		Thread.sleep(5000);

		// Send messages
		sendMessages("BrokerA", dest, MESSAGE_COUNT);
		sendMessages("BrokerB", dest, MESSAGE_COUNT);
		sendMessages("BrokerC", dest, MESSAGE_COUNT);
		sendMessages("BrokerD", dest, MESSAGE_COUNT);

		// Get message count
		MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
		MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
		MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
		MessageIdList msgsD = getConsumerMessages("BrokerD", clientD);

		msgsA.waitForMessagesToArrive(MESSAGE_COUNT * 4);
		msgsB.waitForMessagesToArrive(MESSAGE_COUNT * 4);
		msgsC.waitForMessagesToArrive(MESSAGE_COUNT * 4);
		msgsD.waitForMessagesToArrive(MESSAGE_COUNT * 4);

		assertEquals(MESSAGE_COUNT * 4, msgsA.getMessageCount());
		assertEquals(MESSAGE_COUNT * 4, msgsB.getMessageCount());
		assertEquals(MESSAGE_COUNT * 4, msgsC.getMessageCount());
		assertEquals(MESSAGE_COUNT * 4, msgsD.getMessageCount());
	}

	public void setUp() throws Exception {
		super.setAutoFail(true);
		super.setUp();
		String options = new String("?persistent=false&useJmx=false");
		createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options));
		createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options));
		createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options));
		createBroker(new URI("broker:(tcp://localhost:61619)/BrokerD" + options));
	}

	public static Test suite() {
		return suite(FourBrokerTopicNetworkTest.class);
	}

	@Override
	public void onMessage(Message message) {
		try {
			System.err.println(message.getStringProperty("sender") + " msgID:" + message.getIntProperty("msgId") );
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
	}
}

{code}

I don't know if there's anything wrong with this test, or if I should use different configurations of TTL, conduit, and dynamicOnly. I tested it with the latest AMQ I could get (5.5.1).

As you can see running the test delivered messages are more than 20, they are 25. The reason behind it can be seen in the testSquareConnectedBrokerNetwork2 method: clientD will print all messages coming from BrokerA twice as they are forwarded by both BrokerB and BrokerC on two different paths.
And of course this is a major problem whenever a broker network has multiple paths as message duplication becomes so severe that it basically kills the whole thing.

Please let me know if the test is correct as I'd like to have some more insight about why this is happening. Also my colleagues and I have some ideas about the correct way to fix it.

Regards,

g
                
> CLONE - Duplicate topic messages received with network of brokers and selectors
> -------------------------------------------------------------------------------
>
>                 Key: AMQ-3582
>                 URL: https://issues.apache.org/jira/browse/AMQ-3582
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker, Transport
>    Affects Versions: 5.5.1
>            Reporter: Giovanni Toffetti
>            Assignee: Rob Davies
>         Attachments: ActiveMQActor.java
>
>
> If you create a network of two brokers, A and B, one publisher publishing to A, and n (where n is > 1) receivers with selectors, each receiver recieves n messages for every 1 message sent.  The key here is to have a selector.   It would appear that the conduitSubscriptions flag does not work when using selectors.  The conduit does not properly reconcile consumers if they have selectors.  A suggested soltuion would be that ather than process each selector independantly, each selector should be or'ed together and if any selector results in true then a single message should be sent to the other broker.
> In doing research, it would appear that this problem was introduced with bug fix AMQ-810.  Another user reported it via email back to the assignee of AMQ-810 and a short dialog transpired.  See http://www.mail-archive.com/activemq-users@geronimo.apache.org/msg05198.html.  

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira