You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Jigar Naik <ji...@interactcrm.com> on 2008/08/29 13:04:54 UTC

Consumer stops consuming messages from queue.

hi ,

my producer is producing 10000 messages on a queue (TestQueue).

after my producer completes producing 10000 messages. I am starting my
consumer for consumer 100000 messages. 

My consumer consumes around 6000 messages and than throws exception
indicating no message on ActiveMQ

what could be the possible reason for this ?

I am stumbling with this issue since last 1 week but not able to solve it.
-- 
View this message in context: http://www.nabble.com/Consumer-stops-consuming-messages-from-queue.-tp19217866p19217866.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Consumer stops consuming messages from queue.

Posted by James Strachan <ja...@gmail.com>.
The issue is arising because you are consuming messages but never
acknowledging them (because you never commit your transaction).

See
http://activemq.apache.org/what-is-the-prefetch-limit-for.html

2008/8/29 Jigar Naik <ji...@interactcrm.com>:
>
>
>
> Yeah true.... i will be using auto_ack now onwards,
>
> one more thing...Is the issue arising because of following pattern of method
> calls... ???
>
>       util.startConsume(queueName,connectionFactory);
> //startConsume(); will be called only in the beginning of the Thread's life.
> startConsume() contains the line connection.start();
>
> for(int i=0;i<this.numberOfMessageToConsume;i++){
> //This will be in endless loop
>        xmlText                                                 = util.consumeFromMessageQueue();
>       ... store message to oracle database....
>
> }
> //endConsumer() will be executed once in the Thread's life..
> util.endConsume(...);
>
>
> James.Strachan wrote:
>>
>> Have raised a JIRA to fail faster to avoid others hitting this one
>> https://issues.apache.org/activemq/browse/AMQ-1919
>>
>> but basically if you use a transacted session, you have to commit() at
>> some point :)
>>
>> 2008/8/29 James Strachan <ja...@gmail.com>:
>>> 2008/8/29 Jigar Naik <ji...@interactcrm.com>:
>>>>
>>>> I am using ActiveMQ version 5.1.0 with JDK 1.5
>>>>
>>>> "are you using transactions?"
>>>>
>>>> I didn't really get your quesion.
>>>
>>> Are you using JMS transactions to consume
>>>
>>>>  but after consuming each messages from
>>>> activeMQ i am inserting it into oracle database table
>>>>
>>>> I am not acknowledging the messages  and i have only one consumer.
>>>>
>>>> which continuously reada messages from the queue, on activeMQ console i
>>>> can
>>>> see Number Of Pending Messages around  4000
>>>>
>>>> bellow is my startConsume method code, which will be called before the
>>>> infinite loop.
>>>>
>>>> public void startConsume(String queueName,String queueConnectionFactory)
>>>> {
>>>>
>>>>                try {
>>>>                        String sms_connect = Utility.getSMSConnectHome();
>>>>                        File jmsProp = new File((new
>>>> StringBuilder(String.valueOf(sms_connect))).append(File.separator).append("config").append(File.separator).append("JMSConfig.properties").toString());
>>>>                        props.load(new FileInputStream(jmsProp));
>>>>                        JmsUtil.CONNECTION_FACTORY =
>>>> queueConnectionFactory;
>>>>                        JmsUtil.QUEUE_NAME = queueName;
>>>>                        System.out.println("Queue Name : " +
>>>> JmsUtil.QUEUE_NAME);
>>>>                        connectionFactory = new
>>>> ActiveMQConnectionFactory(JmsUtil.CONNECTION_FACTORY);
>>>>                        connection =
>>>> connectionFactory.createConnection();
>>>>                        connection.setExceptionListener(this);
>>>>                        session = connection.createSession(false,
>>>> javax.jms.Session.SESSION_TRANSACTED);
>>>
>>> This is an error. We should probably throw an exception here.
>>>
>>> You are saying you want to use transacted session ack mode, but not
>>> using the true parameter to indicate transacted consumption.
>>>
>>> Since you're not using session.commit() or message.acknowledge() to
>>> acknowledge messages - I'd recommend using Session.AUTO_ACK
>>>
>>> --
>>> James
>>> -------
>>> http://macstrac.blogspot.com/
>>>
>>> Open Source Integration
>>> http://open.iona.com
>>>
>>
>>
>>
>> --
>> James
>> -------
>> http://macstrac.blogspot.com/
>>
>> Open Source Integration
>> http://open.iona.com
>>
>>
>
> --
> View this message in context: http://www.nabble.com/Consumer-stops-consuming-messages-from-queue.-tp19217866p19218647.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>



-- 
James
-------
http://macstrac.blogspot.com/

Open Source Integration
http://open.iona.com

Re: Consumer stops consuming messages from queue.

Posted by Jigar Naik <ji...@interactcrm.com>.


Yeah true.... i will be using auto_ack now onwards,

one more thing...Is the issue arising because of following pattern of method
calls... ???
 
       util.startConsume(queueName,connectionFactory);       
//startConsume(); will be called only in the beginning of the Thread's life.
startConsume() contains the line connection.start();

for(int i=0;i<this.numberOfMessageToConsume;i++){
//This will be in endless loop
	xmlText 						= util.consumeFromMessageQueue();
       ... store message to oracle database....

}
//endConsumer() will be executed once in the Thread's life.. 
util.endConsume(...);


James.Strachan wrote:
> 
> Have raised a JIRA to fail faster to avoid others hitting this one
> https://issues.apache.org/activemq/browse/AMQ-1919
> 
> but basically if you use a transacted session, you have to commit() at
> some point :)
> 
> 2008/8/29 James Strachan <ja...@gmail.com>:
>> 2008/8/29 Jigar Naik <ji...@interactcrm.com>:
>>>
>>> I am using ActiveMQ version 5.1.0 with JDK 1.5
>>>
>>> "are you using transactions?"
>>>
>>> I didn't really get your quesion.
>>
>> Are you using JMS transactions to consume
>>
>>>  but after consuming each messages from
>>> activeMQ i am inserting it into oracle database table
>>>
>>> I am not acknowledging the messages  and i have only one consumer.
>>>
>>> which continuously reada messages from the queue, on activeMQ console i
>>> can
>>> see Number Of Pending Messages around  4000
>>>
>>> bellow is my startConsume method code, which will be called before the
>>> infinite loop.
>>>
>>> public void startConsume(String queueName,String queueConnectionFactory)
>>> {
>>>
>>>                try {
>>>                        String sms_connect = Utility.getSMSConnectHome();
>>>                        File jmsProp = new File((new
>>> StringBuilder(String.valueOf(sms_connect))).append(File.separator).append("config").append(File.separator).append("JMSConfig.properties").toString());
>>>                        props.load(new FileInputStream(jmsProp));
>>>                        JmsUtil.CONNECTION_FACTORY =
>>> queueConnectionFactory;
>>>                        JmsUtil.QUEUE_NAME = queueName;
>>>                        System.out.println("Queue Name : " +
>>> JmsUtil.QUEUE_NAME);
>>>                        connectionFactory = new
>>> ActiveMQConnectionFactory(JmsUtil.CONNECTION_FACTORY);
>>>                        connection =
>>> connectionFactory.createConnection();
>>>                        connection.setExceptionListener(this);
>>>                        session = connection.createSession(false,
>>> javax.jms.Session.SESSION_TRANSACTED);
>>
>> This is an error. We should probably throw an exception here.
>>
>> You are saying you want to use transacted session ack mode, but not
>> using the true parameter to indicate transacted consumption.
>>
>> Since you're not using session.commit() or message.acknowledge() to
>> acknowledge messages - I'd recommend using Session.AUTO_ACK
>>
>> --
>> James
>> -------
>> http://macstrac.blogspot.com/
>>
>> Open Source Integration
>> http://open.iona.com
>>
> 
> 
> 
> -- 
> James
> -------
> http://macstrac.blogspot.com/
> 
> Open Source Integration
> http://open.iona.com
> 
> 

-- 
View this message in context: http://www.nabble.com/Consumer-stops-consuming-messages-from-queue.-tp19217866p19218647.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Consumer stops consuming messages from queue.

Posted by James Strachan <ja...@gmail.com>.
Have raised a JIRA to fail faster to avoid others hitting this one
https://issues.apache.org/activemq/browse/AMQ-1919

but basically if you use a transacted session, you have to commit() at
some point :)

2008/8/29 James Strachan <ja...@gmail.com>:
> 2008/8/29 Jigar Naik <ji...@interactcrm.com>:
>>
>> I am using ActiveMQ version 5.1.0 with JDK 1.5
>>
>> "are you using transactions?"
>>
>> I didn't really get your quesion.
>
> Are you using JMS transactions to consume
>
>>  but after consuming each messages from
>> activeMQ i am inserting it into oracle database table
>>
>> I am not acknowledging the messages  and i have only one consumer.
>>
>> which continuously reada messages from the queue, on activeMQ console i can
>> see Number Of Pending Messages around  4000
>>
>> bellow is my startConsume method code, which will be called before the
>> infinite loop.
>>
>> public void startConsume(String queueName,String queueConnectionFactory) {
>>
>>                try {
>>                        String sms_connect = Utility.getSMSConnectHome();
>>                        File jmsProp = new File((new
>> StringBuilder(String.valueOf(sms_connect))).append(File.separator).append("config").append(File.separator).append("JMSConfig.properties").toString());
>>                        props.load(new FileInputStream(jmsProp));
>>                        JmsUtil.CONNECTION_FACTORY = queueConnectionFactory;
>>                        JmsUtil.QUEUE_NAME = queueName;
>>                        System.out.println("Queue Name : " + JmsUtil.QUEUE_NAME);
>>                        connectionFactory = new
>> ActiveMQConnectionFactory(JmsUtil.CONNECTION_FACTORY);
>>                        connection = connectionFactory.createConnection();
>>                        connection.setExceptionListener(this);
>>                        session = connection.createSession(false,
>> javax.jms.Session.SESSION_TRANSACTED);
>
> This is an error. We should probably throw an exception here.
>
> You are saying you want to use transacted session ack mode, but not
> using the true parameter to indicate transacted consumption.
>
> Since you're not using session.commit() or message.acknowledge() to
> acknowledge messages - I'd recommend using Session.AUTO_ACK
>
> --
> James
> -------
> http://macstrac.blogspot.com/
>
> Open Source Integration
> http://open.iona.com
>



-- 
James
-------
http://macstrac.blogspot.com/

Open Source Integration
http://open.iona.com

Re: Consumer stops consuming messages from queue.

Posted by James Strachan <ja...@gmail.com>.
2008/8/29 Jigar Naik <ji...@interactcrm.com>:
>
> I am using ActiveMQ version 5.1.0 with JDK 1.5
>
> "are you using transactions?"
>
> I didn't really get your quesion.

Are you using JMS transactions to consume

>  but after consuming each messages from
> activeMQ i am inserting it into oracle database table
>
> I am not acknowledging the messages  and i have only one consumer.
>
> which continuously reada messages from the queue, on activeMQ console i can
> see Number Of Pending Messages around  4000
>
> bellow is my startConsume method code, which will be called before the
> infinite loop.
>
> public void startConsume(String queueName,String queueConnectionFactory) {
>
>                try {
>                        String sms_connect = Utility.getSMSConnectHome();
>                        File jmsProp = new File((new
> StringBuilder(String.valueOf(sms_connect))).append(File.separator).append("config").append(File.separator).append("JMSConfig.properties").toString());
>                        props.load(new FileInputStream(jmsProp));
>                        JmsUtil.CONNECTION_FACTORY = queueConnectionFactory;
>                        JmsUtil.QUEUE_NAME = queueName;
>                        System.out.println("Queue Name : " + JmsUtil.QUEUE_NAME);
>                        connectionFactory = new
> ActiveMQConnectionFactory(JmsUtil.CONNECTION_FACTORY);
>                        connection = connectionFactory.createConnection();
>                        connection.setExceptionListener(this);
>                        session = connection.createSession(false,
> javax.jms.Session.SESSION_TRANSACTED);

This is an error. We should probably throw an exception here.

You are saying you want to use transacted session ack mode, but not
using the true parameter to indicate transacted consumption.

Since you're not using session.commit() or message.acknowledge() to
acknowledge messages - I'd recommend using Session.AUTO_ACK

-- 
James
-------
http://macstrac.blogspot.com/

Open Source Integration
http://open.iona.com

Re: Consumer stops consuming messages from queue.

Posted by Jigar Naik <ji...@interactcrm.com>.
I am using ActiveMQ version 5.1.0 with JDK 1.5

"are you using transactions?"

I didn't really get your quesion. but after consuming each messages from
activeMQ i am inserting it into oracle database table

I am not acknowledging the messages  and i have only one consumer.

which continuously reada messages from the queue, on activeMQ console i can
see Number Of Pending Messages around  4000 

bellow is my startConsume method code, which will be called before the
infinite loop.

public void startConsume(String queueName,String queueConnectionFactory) {

		try {
			String sms_connect = Utility.getSMSConnectHome();
			File jmsProp = new File((new
StringBuilder(String.valueOf(sms_connect))).append(File.separator).append("config").append(File.separator).append("JMSConfig.properties").toString());
			props.load(new FileInputStream(jmsProp));
			JmsUtil.CONNECTION_FACTORY = queueConnectionFactory;
			JmsUtil.QUEUE_NAME = queueName;
			System.out.println("Queue Name : " + JmsUtil.QUEUE_NAME);
			connectionFactory = new
ActiveMQConnectionFactory(JmsUtil.CONNECTION_FACTORY);
			connection = connectionFactory.createConnection();
			connection.setExceptionListener(this);
			session = connection.createSession(false,
javax.jms.Session.SESSION_TRANSACTED);
			destination = session.createQueue(JmsUtil.QUEUE_NAME);
			consumer = session.createConsumer(destination);
			connection.start();
			
		} catch (IOException e) {
			try {
				write("startConsume", e.getMessage(), "DQM", 4);
				fe.write(e, "startConsume");
			} catch (IOException ioe) {
			}

		} catch (JMSException e) {
			try {
				write("startConsume", e.getMessage(), "DQM", 4);
				fe.write(e, "startConsume");
			} catch (IOException ioe) {
			}
		}
	}
	
	
Bellow is my consumeFromActieMQ method. which will be called in loop

public String consumeFromMessageQueue() {
		try {
			message = consumer.receive(100);
			textMessage = (TextMessage) message;
			text = textMessage.getText();
		} catch (JMSException e) {
			try {
				write("consumeFromMessageQueue", "No message on ActiveMQ", "DQM", 2);
				write("consumeFromMessageQueue", e.getMessage(), "DQM", 4);
				fe.write(e, "consumeFromMessageQueue");
			} catch (IOException ioe) {
			}
		}
		return text;
	}
	

and bellow method i am calling when i stop the thread. that is aftr the end
of loop.

public void endConsume() {
		try {
			if (consumer != null) {
				consumer.close();
			}
			if (session != null) {
				session.close();
			}
			if (connection != null) {
				connection.close();
			}
		} catch (JMSException e) {
			try {
				write("endConsume", e.getMessage(), "DQM", 4);
				fe.write(e, "endConsume");
			} catch (IOException ioe) {
			}

		}
	}
	

	
	 http://www.nabble.com/file/p19218230/JmsUtil.java JmsUtil.java 



my first method will be executed before starting reading the 10000 messages.

second method will be executed in loop which is responsible for consuming
messages.

and the third method will be executed when i stop the thread. may be after 1
or 2 or 10 days....


James.Strachan wrote:
> 
> 2008/8/29 Jigar Naik <ji...@interactcrm.com>:
>>
>> hi ,
>>
>> my producer is producing 10000 messages on a queue (TestQueue).
>>
>> after my producer completes producing 10000 messages. I am starting my
>> consumer for consumer 100000 messages.
>>
>> My consumer consumes around 6000 messages and than throws exception
>> indicating no message on ActiveMQ
>>
>> what could be the possible reason for this ?
>>
>> I am stumbling with this issue since last 1 week but not able to solve
>> it.
> 
> Some tips about getting help...
> http://activemq.apache.org/support.html
> 
> e.g. what version are you using? are you using transactions? Do you
> acknowledge the messages? Do you have any other consumers?
> 
> -- 
> James
> -------
> http://macstrac.blogspot.com/
> 
> Open Source Integration
> http://open.iona.com
> 
> 

-- 
View this message in context: http://www.nabble.com/Consumer-stops-consuming-messages-from-queue.-tp19217866p19218230.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: Consumer stops consuming messages from queue.

Posted by James Strachan <ja...@gmail.com>.
2008/8/29 Jigar Naik <ji...@interactcrm.com>:
>
> hi ,
>
> my producer is producing 10000 messages on a queue (TestQueue).
>
> after my producer completes producing 10000 messages. I am starting my
> consumer for consumer 100000 messages.
>
> My consumer consumes around 6000 messages and than throws exception
> indicating no message on ActiveMQ
>
> what could be the possible reason for this ?
>
> I am stumbling with this issue since last 1 week but not able to solve it.

Some tips about getting help...
http://activemq.apache.org/support.html

e.g. what version are you using? are you using transactions? Do you
acknowledge the messages? Do you have any other consumers?

-- 
James
-------
http://macstrac.blogspot.com/

Open Source Integration
http://open.iona.com