You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Oleg Dulin <ol...@gmail.com> on 2013/08/14 20:59:55 UTC

Broker federation and messages not getting forwarded

Dear Distinguished Colleagues:

I have the following configuration.

I have broker1 (7.107) set up as follows:

<transportConnectors>
           <transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
</transportConnectors>

My second broker (7.106) is setup like this:

 <transportConnectors>
             <transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
</transportConnectors>
<networkConnectors>
            <networkConnector name="connector106.107"
                                uri="static:(tcp://192.168.7.107:3200)" 
duplex="true" />
 </networkConnectors>

Once in awhile, but consistently, the federation gets into a funny 
state where if consumers are on 106, but producer is on 107, 106 
doesn't get any messages.

Same happens if roducer is on 106, and consumers are on 107.

If they are all on the same broker, either one, all is well.

Once I restart the brokers, everything's fine for awhile, until it is 
not. Usually a moderate to heavy volume of messages is enough to get 
the system into this state.

I am convinced it is either a misconfiguration on my part, or a bug in 
ActiveMQ. Hopefully, this is just a misconfiguration.

Here is the test code. I change the IPs for both consumers and producer 
to test it out.

public class FederationTest
{
	public static void main(String args[]) throws JMSException, 
InterruptedException
	{
		ActiveMQConnectionFactory factory1 = new 
ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
		factory1.setUseAsyncSend(true);
		initConsumer("c1",factory1);
		
		
		ActiveMQConnectionFactory factory2 = new 
ActiveMQConnectionFactory("failover:(tcp://192.168.7.106:3200)");
		factory2.setUseAsyncSend(true);
		initConsumer("c2",factory2);
		
		
		ActiveMQConnectionFactory factory3 = new 
ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
		factory3.setUseAsyncSend(true);
		initProducer(factory3);
		while(true)
		{
			Thread.sleep(1000);
		}
		
	}

	private static void initProducer(ActiveMQConnectionFactory factory) 
throws JMSException
	{
		QueueConnection queueConnection1=factory.createQueueConnection();
		queueConnection1.start();
		final QueueSession qs1=queueConnection1.createQueueSession(false, 
Session.AUTO_ACKNOWLEDGE);
		Queue q1=qs1.createQueue("TEST");
		final QueueSender qSender=qs1.createSender(q1);
		new Thread(new Runnable() {

			@Override
			public void run()
			{
				int counter=0;
				while(true)
				{
					try
					{
						TextMessage msg=qs1.createTextMessage();
						msg.setText("counter="+counter);
						System.out.println("p:"+counter);
						counter++;
						qSender.send(msg);
						
						Thread.sleep(1000);
					}
					catch(Exception exp)
					{
						exp.printStackTrace();
					}
					
				}
			}
			
		}).start();
		
		
	}

	public static void initConsumer(final String 
cname,ActiveMQConnectionFactory factory) throws JMSException
	{
		QueueConnection queueConnection1=factory.createQueueConnection();
		queueConnection1.start();
		QueueSession qs1=queueConnection1.createQueueSession(false, 
Session.AUTO_ACKNOWLEDGE);
		Queue q1=qs1.createQueue("TEST");
		MessageConsumer c1=qs1.createConsumer(q1);
		c1.setMessageListener(new MessageListener() {
			@Override
			public void onMessage(Message msg)
			{
				try
				{
					TextMessage txt=(TextMessage) msg;
					System.out.println(cname+":"+txt.getText());
					Thread.sleep((long) (1000*Math.random()));
				}
				catch(Exception exp)
				{
					exp.printStackTrace();
				}
			}
			
		});
	}
}




-- 
Regards,
Oleg Dulin
http://www.olegdulin.com

Re: Broker federation and messages not getting forwarded

Posted by Oleg Dulin <ol...@gmail.com>.
Another factoid -- if I don't use "duplex" then all seems to be well.

On 2013-08-14 19:03:36 +0000, Oleg Dulin said:

> One more note -- here is a kicker.
> 
> If I change the consumers to producer's IP, comment out hte producer, 
> and restart the program -- they get all their messages!
> 
> This is driving me nuts!
> 
> On 2013-08-14 18:59:55 +0000, Oleg Dulin said:
> 
>> ----------------17595334911828795876
>> 
>> Content-Type: text/plain; charset=iso-8859-1; format=flowed
>> 
>> Content-Transfer-Encoding: 8bit
>> 
>> 
>> 
>> Dear Distinguished Colleagues:
>> 
>> 
>> 
>> I have the following configuration.
>> 
>> 
>> 
>> I have broker1 (7.107) set up as follows:
>> 
>> 
>> 
>> <transportConnectors>
>> 
>> <transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
>> 
>> </transportConnectors>
>> 
>> 
>> 
>> My second broker (7.106) is setup like this:
>> 
>> 
>> 
>> <transportConnectors>
>> 
>> <transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
>> 
>> </transportConnectors>
>> 
>> <networkConnectors>
>> 
>> <networkConnector name="connector106.107"
>> 
>> uri="static:(tcp://192.168.7.107:3200)"
>> duplex="true" />
>> 
>> </networkConnectors>
>> 
>> 
>> 
>> Once in awhile, but consistently, the federation gets into a funny
>> state where if consumers are on 106, but producer is on 107, 106
>> doesn't get any messages.
>> 
>> 
>> 
>> Same happens if roducer is on 106, and consumers are on 107.
>> 
>> 
>> 
>> If they are all on the same broker, either one, all is well.
>> 
>> 
>> 
>> Once I restart the brokers, everything's fine for awhile, until it is
>> not. Usually a moderate to heavy volume of messages is enough to get
>> the system into this state.
>> 
>> 
>> 
>> I am convinced it is either a misconfiguration on my part, or a bug in
>> ActiveMQ. Hopefully, this is just a misconfiguration.
>> 
>> 
>> 
>> Here is the test code. I change the IPs for both consumers and producer
>> to test it out.
>> 
>> 
>> 
>> public class FederationTest
>> 
>> {
>> 
>> 	public static void main(String args[]) throws JMSException,
>> InterruptedException
>> 
>> 	{
>> 
>> 		ActiveMQConnectionFactory factory1 = new
>> ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
>> 
>> 		factory1.setUseAsyncSend(true);
>> 
>> 		initConsumer("c1",factory1);
>> 
>> 		
>> 
>> 		
>> 
>> 		ActiveMQConnectionFactory factory2 = new
>> ActiveMQConnectionFactory("failover:(tcp://192.168.7.106:3200)");
>> 
>> 		factory2.setUseAsyncSend(true);
>> 
>> 		initConsumer("c2",factory2);
>> 
>> 		
>> 
>> 		
>> 
>> 		ActiveMQConnectionFactory factory3 = new
>> ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
>> 
>> 		factory3.setUseAsyncSend(true);
>> 
>> 		initProducer(factory3);
>> 
>> 		while(true)
>> 
>> 		{
>> 
>> 			Thread.sleep(1000);
>> 
>> 		}
>> 
>> 		
>> 
>> 	}
>> 
>> 
>> 
>> 	private static void initProducer(ActiveMQConnectionFactory factory)
>> throws JMSException
>> 
>> 	{
>> 
>> 		QueueConnection queueConnection1=factory.createQueueConnection();
>> 
>> 		queueConnection1.start();
>> 
>> 		final QueueSession qs1=queueConnection1.createQueueSession(false,
>> Session.AUTO_ACKNOWLEDGE);
>> 
>> 		Queue q1=qs1.createQueue("TEST");
>> 
>> 		final QueueSender qSender=qs1.createSender(q1);
>> 
>> 		new Thread(new Runnable() {
>> 
>> 
>> 
>> 			@Override
>> 
>> 			public void run()
>> 
>> 			{
>> 
>> 				int counter=0;
>> 
>> 				while(true)
>> 
>> 				{
>> 
>> 					try
>> 
>> 					{
>> 
>> 						TextMessage msg=qs1.createTextMessage();
>> 
>> 						msg.setText("counter="+counter);
>> 
>> 						System.out.println("p:"+counter);
>> 
>> 						counter++;
>> 
>> 						qSender.send(msg);
>> 
>> 						
>> 
>> 						Thread.sleep(1000);
>> 
>> 					}
>> 
>> 					catch(Exception exp)
>> 
>> 					{
>> 
>> 						exp.printStackTrace();
>> 
>> 					}
>> 
>> 					
>> 
>> 				}
>> 
>> 			}
>> 
>> 			
>> 
>> 		}).start();
>> 
>> 		
>> 
>> 		
>> 
>> 	}
>> 
>> 
>> 
>> 	public static void initConsumer(final String
>> cname,ActiveMQConnectionFactory factory) throws JMSException
>> 
>> 	{
>> 
>> 		QueueConnection queueConnection1=factory.createQueueConnection();
>> 
>> 		queueConnection1.start();
>> 
>> 		QueueSession qs1=queueConnection1.createQueueSession(false,
>> Session.AUTO_ACKNOWLEDGE);
>> 
>> 		Queue q1=qs1.createQueue("TEST");
>> 
>> 		MessageConsumer c1=qs1.createConsumer(q1);
>> 
>> 		c1.setMessageListener(new MessageListener() {
>> 
>> 			@Override
>> 
>> 			public void onMessage(Message msg)
>> 
>> 			{
>> 
>> 				try
>> 
>> 				{
>> 
>> 					TextMessage txt=(TextMessage) msg;
>> 
>> 					System.out.println(cname+":"+txt.getText());
>> 
>> 					Thread.sleep((long) (1000*Math.random()));
>> 
>> 				}
>> 
>> 				catch(Exception exp)
>> 
>> 				{
>> 
>> 					exp.printStackTrace();
>> 
>> 				}
>> 
>> 			}
>> 
>> 			
>> 
>> 		});
>> 
>> 	}
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> --
>> 
>> Regards,
>> 
>> Oleg Dulin
>> 
>> http://www.olegdulin.com
>> 
>> ----------------17595334911828795876--


-- 
Regards,
Oleg Dulin
http://www.olegdulin.com



Re: Broker federation and messages not getting forwarded

Posted by Oleg Dulin <ol...@gmail.com>.
One more note -- here is a kicker.

If I change the consumers to producer's IP, comment out hte producer, 
and restart the program -- they get all their messages!

This is driving me nuts!

On 2013-08-14 18:59:55 +0000, Oleg Dulin said:

> ----------------17595334911828795876
> 
> Content-Type: text/plain; charset=iso-8859-1; format=flowed
> 
> Content-Transfer-Encoding: 8bit
> 
> 
> 
> Dear Distinguished Colleagues:
> 
> 
> 
> I have the following configuration.
> 
> 
> 
> I have broker1 (7.107) set up as follows:
> 
> 
> 
> <transportConnectors>
> 
>            <transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
> 
> </transportConnectors>
> 
> 
> 
> My second broker (7.106) is setup like this:
> 
> 
> 
>  <transportConnectors>
> 
>              <transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
> 
> </transportConnectors>
> 
> <networkConnectors>
> 
>             <networkConnector name="connector106.107"
> 
>                                 uri="static:(tcp://192.168.7.107:3200)" 
> 
> duplex="true" />
> 
>  </networkConnectors>
> 
> 
> 
> Once in awhile, but consistently, the federation gets into a funny 
> 
> state where if consumers are on 106, but producer is on 107, 106 
> 
> doesn't get any messages.
> 
> 
> 
> Same happens if roducer is on 106, and consumers are on 107.
> 
> 
> 
> If they are all on the same broker, either one, all is well.
> 
> 
> 
> Once I restart the brokers, everything's fine for awhile, until it is 
> 
> not. Usually a moderate to heavy volume of messages is enough to get 
> 
> the system into this state.
> 
> 
> 
> I am convinced it is either a misconfiguration on my part, or a bug in 
> 
> ActiveMQ. Hopefully, this is just a misconfiguration.
> 
> 
> 
> Here is the test code. I change the IPs for both consumers and producer 
> 
> to test it out.
> 
> 
> 
> public class FederationTest
> 
> {
> 
> 	public static void main(String args[]) throws JMSException, 
> 
> InterruptedException
> 
> 	{
> 
> 		ActiveMQConnectionFactory factory1 = new 
> 
> ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
> 
> 		factory1.setUseAsyncSend(true);
> 
> 		initConsumer("c1",factory1);
> 
> 		
> 
> 		
> 
> 		ActiveMQConnectionFactory factory2 = new 
> 
> ActiveMQConnectionFactory("failover:(tcp://192.168.7.106:3200)");
> 
> 		factory2.setUseAsyncSend(true);
> 
> 		initConsumer("c2",factory2);
> 
> 		
> 
> 		
> 
> 		ActiveMQConnectionFactory factory3 = new 
> 
> ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
> 
> 		factory3.setUseAsyncSend(true);
> 
> 		initProducer(factory3);
> 
> 		while(true)
> 
> 		{
> 
> 			Thread.sleep(1000);
> 
> 		}
> 
> 		
> 
> 	}
> 
> 
> 
> 	private static void initProducer(ActiveMQConnectionFactory factory) 
> 
> throws JMSException
> 
> 	{
> 
> 		QueueConnection queueConnection1=factory.createQueueConnection();
> 
> 		queueConnection1.start();
> 
> 		final QueueSession qs1=queueConnection1.createQueueSession(false, 
> 
> Session.AUTO_ACKNOWLEDGE);
> 
> 		Queue q1=qs1.createQueue("TEST");
> 
> 		final QueueSender qSender=qs1.createSender(q1);
> 
> 		new Thread(new Runnable() {
> 
> 
> 
> 			@Override
> 
> 			public void run()
> 
> 			{
> 
> 				int counter=0;
> 
> 				while(true)
> 
> 				{
> 
> 					try
> 
> 					{
> 
> 						TextMessage msg=qs1.createTextMessage();
> 
> 						msg.setText("counter="+counter);
> 
> 						System.out.println("p:"+counter);
> 
> 						counter++;
> 
> 						qSender.send(msg);
> 
> 						
> 
> 						Thread.sleep(1000);
> 
> 					}
> 
> 					catch(Exception exp)
> 
> 					{
> 
> 						exp.printStackTrace();
> 
> 					}
> 
> 					
> 
> 				}
> 
> 			}
> 
> 			
> 
> 		}).start();
> 
> 		
> 
> 		
> 
> 	}
> 
> 
> 
> 	public static void initConsumer(final String 
> 
> cname,ActiveMQConnectionFactory factory) throws JMSException
> 
> 	{
> 
> 		QueueConnection queueConnection1=factory.createQueueConnection();
> 
> 		queueConnection1.start();
> 
> 		QueueSession qs1=queueConnection1.createQueueSession(false, 
> 
> Session.AUTO_ACKNOWLEDGE);
> 
> 		Queue q1=qs1.createQueue("TEST");
> 
> 		MessageConsumer c1=qs1.createConsumer(q1);
> 
> 		c1.setMessageListener(new MessageListener() {
> 
> 			@Override
> 
> 			public void onMessage(Message msg)
> 
> 			{
> 
> 				try
> 
> 				{
> 
> 					TextMessage txt=(TextMessage) msg;
> 
> 					System.out.println(cname+":"+txt.getText());
> 
> 					Thread.sleep((long) (1000*Math.random()));
> 
> 				}
> 
> 				catch(Exception exp)
> 
> 				{
> 
> 					exp.printStackTrace();
> 
> 				}
> 
> 			}
> 
> 			
> 
> 		});
> 
> 	}
> 
> }
> 
> 
> 
> 
> 
> 
> 
> 
> 
> -- 
> 
> Regards,
> 
> Oleg Dulin
> 
> http://www.olegdulin.com
> 
> ----------------17595334911828795876--
> 
> 
> 
> 
> 
> 
> 


-- 
Regards,
Oleg Dulin
http://www.olegdulin.com