You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by osian <os...@osian.me.uk> on 2006/05/05 08:51:31 UTC

Setting up ActiveMQ using java1.5 and ActiveMQ4.0RC2

Hi all,

I am currently looking at using ActiveMQ as our message broker but It seems
to hang on a regular basis.  In my test environment I have 2 brokers
clustered together, an oracle DB behind them for the journaling, and then 3
consumers 2 doing specific queues, and another being able to process any
queue. Also, on one of the machines, it scans a directory for files and then
converts the found files into a JMS message to be processed.
On the first run through, it processed a 1000 files and it seemed ok, I then
ran multiple threads to process multiple queues on each consumer machine,
and it seemed to hang intermittently, due to this I abandoned this idea and
went back to the first scenario, so to test it fully, I put 10,000 files in
the directory and left it running overnight, I came in to find that it had
only picked up 3,000 files, processed 177 messages, and there are 2,958
messages sitting in ACTIVEMQ_MSGS table, and the consumers are sitting there
doing nothing.  If I stop and start the consumers, they process one message,
and then hang again, but if I only run one consumer, it starts processing
messages for a while, and then hangs again.
I believe that this must be a setup problem and ActiveMQ has everything that
I need so I would love to use it. If anyone has any ActiveMQ configuration
suggestions or code samples for the consumers, producers, etc. I would be
very greatful,

Kind regards,
Osian

Here is my activemq.xml file:
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns:amq="http://activemq.org/config/1.0">

    <amq:broker brokerName="ProactJMSBroker" useJmx="true"
useShutdownHook="true" persistent="true" deleteAllMessagesOnStartup="false">

		<amq:transportConnectors>
			<amq:transportConnector uri="tcp://localhost:61616"
discoveryUri="multicast://ProactJMSService"/>
		</amq:transportConnectors>
 
		<amq:networkConnectors>
			<amq:networkConnector uri="multicast://ProactJMSService"/>
		</amq:networkConnectors>

		<amq:persistenceAdapter>
			<amq:jdbcPersistenceAdapter>
				<property name="cleanupPeriod" value="600000"/>
				<property name="dataSource" ref="oracle-ds"/>
			</amq:jdbcPersistenceAdapter>
		</amq:persistenceAdapter>
    </amq:broker>

    <!--
==================================================================== -->
    <!-- JDBC DataSource Configurations -->
    <!--
==================================================================== -->

    <!-- The Datasource that will be used by the Broker -->
	<bean id="oracle-ds" class="net.proact.scm.sql.ProactPoolingDataSource">
		<property name="url" value="jdbc:oracle:oci:@CNHDEV"/>
		<property name="userName" value="CNHDEV"/>
		<property name="password" value="CNHDEV"/>
	</bean>

</beans>

Here is some sample code for the consumer:
	public void runConsumer() {
		try {
			Connection connection = createConnection(getURL());
			connection.setExceptionListener(this);
			session = createSession(connection);
			MessageConsumer consumer = session.createConsumer(getDestination(session,
getSubject()));
			
			consumeMessagesAndClose(connection, consumer, timeOut);
		}
		catch (Exception e) {
			System.out.println("Caught: " + e);
			e.printStackTrace();
			System.exit(-1);
		}
	}

    public static Connection createConnection(String url) throws
JMSException, Exception {
        ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(getUser(), getPassword(), url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        return connection;
    }

    public static Session createSession(Connection connection) throws
Exception {
        Session session = connection.createSession(true,
Session.CLIENT_ACKNOWLEDGE);
        return session;
    }

    public Destination getDestination(Session session, String queueName)
throws Exception {
    	if (destination == null) {
    		destination = createQueue(session, queueName);
    	}
    	return destination;
    }

    private void consumeMessagesAndClose(Connection connection,
MessageConsumer consumer, long timeout) throws JMSException {
        System.out.println("Consumer (" + myConsumerName + ") will consume
messages for queue '" + getSubject() + "' while they continue to be
delivered within: " + timeout + " ms");

        Message message;
        while (true) {
        	if ((message = consumer.receive(timeout)) != null) {
        		onMessage(message);
        		message.acknowledge();
        		session.commit();
        	}
        	System.gc();
        }
    }

	public void onMessage(Message arg0) {
		if (arg0 instanceof ActiveMQObjectMessage) {
			long start = System.currentTimeMillis();
			ActiveMQObjectMessage message = (ActiveMQObjectMessage) arg0;
			
			try {
				if (message.getObject() instanceof JMSMessageInterface) {
					JMSMessageInterface myMessage = (JMSMessageInterface)
message.getObject();
					boolean success = myMessage.processMessage(getEditingContext());
					if (success) {
						System.err.println("Success : " + ModelConstants.LINE_SEPARATOR);
					}
					else {
						System.err.println("Failed : " + ModelConstants.LINE_SEPARATOR);
					}
					System.err.println(myMessage.toStringDescription());
					long complete = System.currentTimeMillis();
				}
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (UnknownHostException uhe) {
				uhe.printStackTrace();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}


And for the producer:
    public void run() {
        try {
            
        	File baseDirectory = new File(Config.getEDIBaseDir(
getEditingContext() ));
        	
        	if (!baseDirectory.exists()) {
        		baseDirectory.mkdirs();
        	}
        	File inDirectory = new File(baseDirectory, "In");
        	File pickedUpDirectory = new File(baseDirectory, "PickedUp");
        	if (!inDirectory.exists()) {
        		inDirectory.mkdirs();
        	}
        	if (!pickedUpDirectory.exists()) {
        		pickedUpDirectory.mkdirs();
        	}
        	
            Connection connection = createConnection(getURL());
            Session session = createSession(connection);
            MessageProducer producer = createProducer(timeToLive, session,
getDestination(session, getSubject()));
            //sendLoop(session, producer);
            
            while (connection != null) {
                try {
                	
                	File[] filesFound = inDirectory.listFiles();
                	Arrays.sort(filesFound, DATE_COMPARE);
                	
                    for (File foundFile : filesFound) {
                    	File pickedUpFile = new File(pickedUpDirectory,
foundFile.getName());
                    	EDIFile ediFile =
EDIManager.getEDIFileForImport(getEditingContext(), foundFile.getName());
                    	if (ediFile != null) {
                    		sendMessage(session, producer, ediFile, new
LineNumberReader(new FileReader(foundFile)), foundFile.getName());
                    	}
                        foundFile.renameTo(pickedUpFile);
                    }        
                }
                catch (Exception e) {
                    CoreLogger.println("Exception : "+e);
                    e.printStackTrace();
                }
                Thread.sleep(500);
            }

            System.out.println("Done.");
            close(connection, session);
        }
        catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }x
--
View this message in context: http://www.nabble.com/Setting-up-ActiveMQ-using-java1.5-and-ActiveMQ4.0RC2-t1562133.html#a4242459
Sent from the ActiveMQ - User forum at Nabble.com.


Re: Setting up ActiveMQ using java1.5 and ActiveMQ4.0RC2

Posted by osian <os...@osian.me.uk>.
Will do, thanks.  Im also looking at using FUSE, so I shall start my test
again (start simple then add complexity...)
--
View this message in context: http://www.nabble.com/Setting-up-ActiveMQ-using-java1.5-and-ActiveMQ4.0RC2-t1562133.html#a4243494
Sent from the ActiveMQ - User forum at Nabble.com.


Re: Setting up ActiveMQ using java1.5 and ActiveMQ4.0RC2

Posted by James Strachan <ja...@gmail.com>.
On 5/5/06, osian <os...@osian.me.uk> wrote:
>
> No, both brokers are talking to the same database, should they have
> individual DB's?

Yes. That explains the duplicate primary keys :)


> Now that I am running a single broker, in the JConsole window, I can see
> both queues, one has been fully processed, and the other is at a size of
> 2953, with a deqeue count of 500 and 3 consumers connected to it.  This
> queue is being processed, and should be publishing onto the other queue, but
> the size of the queue is not coming down and it isn't publishing onto the
> other queue (this was working).  This is most strange. Saying that, I have
> just checked the machines again and they are processing both messages, but
> the JConsole doesn't show that this is happening, whats going on????? Am I
> going mad?
> Should I be using topic's instead of queues, or durable subscribers?? All I
> need is a guarantee that a message is processed, and processed only once.

Queues should be fine. I'd maybe zap the database and start again with
just 1 broker working against it until you're happy you've got your
setup working with queues.

--

James
-------
http://radio.weblogs.com/0112098/

Re: Setting up ActiveMQ using java1.5 and ActiveMQ4.0RC2

Posted by osian <os...@osian.me.uk>.
No, both brokers are talking to the same database, should they have
individual DB's?

Now that I am running a single broker, in the JConsole window, I can see
both queues, one has been fully processed, and the other is at a size of
2953, with a deqeue count of 500 and 3 consumers connected to it.  This
queue is being processed, and should be publishing onto the other queue, but
the size of the queue is not coming down and it isn't publishing onto the
other queue (this was working).  This is most strange. Saying that, I have
just checked the machines again and they are processing both messages, but
the JConsole doesn't show that this is happening, whats going on????? Am I
going mad?
Should I be using topic's instead of queues, or durable subscribers?? All I
need is a guarantee that a message is processed, and processed only once.
--
View this message in context: http://www.nabble.com/Setting-up-ActiveMQ-using-java1.5-and-ActiveMQ4.0RC2-t1562133.html#a4243235
Sent from the ActiveMQ - User forum at Nabble.com.


Re: Setting up ActiveMQ using java1.5 and ActiveMQ4.0RC2

Posted by James Strachan <ja...@gmail.com>.
On 5/5/06, osian <os...@osian.me.uk> wrote:
> I do get a lot of errors on the broker with regards to writting messages to
> the journal, i.e. duplicate primary keys.

This is most strange. You are using a separate database for each broker?

> It also seems to process some
> messages twice (which isn't good). I will try running it with just one
> broker and let you know how I get on, but I will need to run it with
> multiple brokers due to the fact that I will be publishing millions or
> messages through the broker in a very short period of time.

FWIW one broker can handle millions of messages in a short space of
time, particularly if you are using lots of concurrent producers and
consumers (or you enable async sending on producers).

My recommendation to try 1 broker first was to check your software and
the configuration is working fine before you complicate matters with
networks or master/slave etc.


> What information should I be looking for in the JMX console?

The size of queues and state of clients etc.

> Also, is it
> possible to make the broker log to a file instead of the console? at least
> then I can capture all of the information for you.

We use commons-logging, so you can change the log4j.properties file to
log to a file or console or both.

http://logging.apache.org/log4j/docs/manual.html
--

James
-------
http://radio.weblogs.com/0112098/

Re: Setting up ActiveMQ using java1.5 and ActiveMQ4.0RC2

Posted by osian <os...@osian.me.uk>.
I do get a lot of errors on the broker with regards to writting messages to
the journal, i.e. duplicate primary keys.  It also seems to process some
messages twice (which isn't good). I will try running it with just one
broker and let you know how I get on, but I will need to run it with
multiple brokers due to the fact that I will be publishing millions or
messages through the broker in a very short period of time.
What information should I be looking for in the JMX console?  Also, is it
possible to make the broker log to a file instead of the console? at least
then I can capture all of the information for you.
--
View this message in context: http://www.nabble.com/Setting-up-ActiveMQ-using-java1.5-and-ActiveMQ4.0RC2-t1562133.html#a4242747
Sent from the ActiveMQ - User forum at Nabble.com.


Re: Setting up ActiveMQ using java1.5 and ActiveMQ4.0RC2

Posted by James Strachan <ja...@gmail.com>.
Does your code work OK if you just use one broker? When things appear
to 'hang' how do things look in JMX? Is there any warnings/errors in
the log of the clients or brokers?

On 5/5/06, osian <os...@osian.me.uk> wrote:
>
> Hi all,
>
> I am currently looking at using ActiveMQ as our message broker but It seems
> to hang on a regular basis.  In my test environment I have 2 brokers
> clustered together, an oracle DB behind them for the journaling, and then 3
> consumers 2 doing specific queues, and another being able to process any
> queue. Also, on one of the machines, it scans a directory for files and then
> converts the found files into a JMS message to be processed.
> On the first run through, it processed a 1000 files and it seemed ok, I then
> ran multiple threads to process multiple queues on each consumer machine,
> and it seemed to hang intermittently, due to this I abandoned this idea and
> went back to the first scenario, so to test it fully, I put 10,000 files in
> the directory and left it running overnight, I came in to find that it had
> only picked up 3,000 files, processed 177 messages, and there are 2,958
> messages sitting in ACTIVEMQ_MSGS table, and the consumers are sitting there
> doing nothing.  If I stop and start the consumers, they process one message,
> and then hang again, but if I only run one consumer, it starts processing
> messages for a while, and then hangs again.
> I believe that this must be a setup problem and ActiveMQ has everything that
> I need so I would love to use it. If anyone has any ActiveMQ configuration
> suggestions or code samples for the consumers, producers, etc. I would be
> very greatful,
>
> Kind regards,
> Osian
>
> Here is my activemq.xml file:
> <?xml version="1.0" encoding="UTF-8"?>
>
> <beans xmlns:amq="http://activemq.org/config/1.0">
>
>     <amq:broker brokerName="ProactJMSBroker" useJmx="true"
> useShutdownHook="true" persistent="true" deleteAllMessagesOnStartup="false">
>
>                 <amq:transportConnectors>
>                         <amq:transportConnector uri="tcp://localhost:61616"
> discoveryUri="multicast://ProactJMSService"/>
>                 </amq:transportConnectors>
>
>                 <amq:networkConnectors>
>                         <amq:networkConnector uri="multicast://ProactJMSService"/>
>                 </amq:networkConnectors>
>
>                 <amq:persistenceAdapter>
>                         <amq:jdbcPersistenceAdapter>
>                                 <property name="cleanupPeriod" value="600000"/>
>                                 <property name="dataSource" ref="oracle-ds"/>
>                         </amq:jdbcPersistenceAdapter>
>                 </amq:persistenceAdapter>
>     </amq:broker>
>
>     <!--
> ==================================================================== -->
>     <!-- JDBC DataSource Configurations -->
>     <!--
> ==================================================================== -->
>
>     <!-- The Datasource that will be used by the Broker -->
>         <bean id="oracle-ds" class="net.proact.scm.sql.ProactPoolingDataSource">
>                 <property name="url" value="jdbc:oracle:oci:@CNHDEV"/>
>                 <property name="userName" value="CNHDEV"/>
>                 <property name="password" value="CNHDEV"/>
>         </bean>
>
> </beans>
>
> Here is some sample code for the consumer:
>         public void runConsumer() {
>                 try {
>                         Connection connection = createConnection(getURL());
>                         connection.setExceptionListener(this);
>                         session = createSession(connection);
>                         MessageConsumer consumer = session.createConsumer(getDestination(session,
> getSubject()));
>
>                         consumeMessagesAndClose(connection, consumer, timeOut);
>                 }
>                 catch (Exception e) {
>                         System.out.println("Caught: " + e);
>                         e.printStackTrace();
>                         System.exit(-1);
>                 }
>         }
>
>     public static Connection createConnection(String url) throws
> JMSException, Exception {
>         ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(getUser(), getPassword(), url);
>         Connection connection = connectionFactory.createConnection();
>         connection.start();
>         return connection;
>     }
>
>     public static Session createSession(Connection connection) throws
> Exception {
>         Session session = connection.createSession(true,
> Session.CLIENT_ACKNOWLEDGE);
>         return session;
>     }
>
>     public Destination getDestination(Session session, String queueName)
> throws Exception {
>         if (destination == null) {
>                 destination = createQueue(session, queueName);
>         }
>         return destination;
>     }
>
>     private void consumeMessagesAndClose(Connection connection,
> MessageConsumer consumer, long timeout) throws JMSException {
>         System.out.println("Consumer (" + myConsumerName + ") will consume
> messages for queue '" + getSubject() + "' while they continue to be
> delivered within: " + timeout + " ms");
>
>         Message message;
>         while (true) {
>                 if ((message = consumer.receive(timeout)) != null) {
>                         onMessage(message);
>                         message.acknowledge();
>                         session.commit();
>                 }
>                 System.gc();
>         }
>     }
>
>         public void onMessage(Message arg0) {
>                 if (arg0 instanceof ActiveMQObjectMessage) {
>                         long start = System.currentTimeMillis();
>                         ActiveMQObjectMessage message = (ActiveMQObjectMessage) arg0;
>
>                         try {
>                                 if (message.getObject() instanceof JMSMessageInterface) {
>                                         JMSMessageInterface myMessage = (JMSMessageInterface)
> message.getObject();
>                                         boolean success = myMessage.processMessage(getEditingContext());
>                                         if (success) {
>                                                 System.err.println("Success : " + ModelConstants.LINE_SEPARATOR);
>                                         }
>                                         else {
>                                                 System.err.println("Failed : " + ModelConstants.LINE_SEPARATOR);
>                                         }
>                                         System.err.println(myMessage.toStringDescription());
>                                         long complete = System.currentTimeMillis();
>                                 }
>                         } catch (JMSException e) {
>                                 // TODO Auto-generated catch block
>                                 e.printStackTrace();
>                         } catch (UnknownHostException uhe) {
>                                 uhe.printStackTrace();
>                         } catch (Exception e) {
>                                 e.printStackTrace();
>                         }
>                 }
>         }
>
>
> And for the producer:
>     public void run() {
>         try {
>
>                 File baseDirectory = new File(Config.getEDIBaseDir(
> getEditingContext() ));
>
>                 if (!baseDirectory.exists()) {
>                         baseDirectory.mkdirs();
>                 }
>                 File inDirectory = new File(baseDirectory, "In");
>                 File pickedUpDirectory = new File(baseDirectory, "PickedUp");
>                 if (!inDirectory.exists()) {
>                         inDirectory.mkdirs();
>                 }
>                 if (!pickedUpDirectory.exists()) {
>                         pickedUpDirectory.mkdirs();
>                 }
>
>             Connection connection = createConnection(getURL());
>             Session session = createSession(connection);
>             MessageProducer producer = createProducer(timeToLive, session,
> getDestination(session, getSubject()));
>             //sendLoop(session, producer);
>
>             while (connection != null) {
>                 try {
>
>                         File[] filesFound = inDirectory.listFiles();
>                         Arrays.sort(filesFound, DATE_COMPARE);
>
>                     for (File foundFile : filesFound) {
>                         File pickedUpFile = new File(pickedUpDirectory,
> foundFile.getName());
>                         EDIFile ediFile =
> EDIManager.getEDIFileForImport(getEditingContext(), foundFile.getName());
>                         if (ediFile != null) {
>                                 sendMessage(session, producer, ediFile, new
> LineNumberReader(new FileReader(foundFile)), foundFile.getName());
>                         }
>                         foundFile.renameTo(pickedUpFile);
>                     }
>                 }
>                 catch (Exception e) {
>                     CoreLogger.println("Exception : "+e);
>                     e.printStackTrace();
>                 }
>                 Thread.sleep(500);
>             }
>
>             System.out.println("Done.");
>             close(connection, session);
>         }
>         catch (Exception e) {
>             System.out.println("Caught: " + e);
>             e.printStackTrace();
>         }
>     }x
> --
> View this message in context: http://www.nabble.com/Setting-up-ActiveMQ-using-java1.5-and-ActiveMQ4.0RC2-t1562133.html#a4242459
> Sent from the ActiveMQ - User forum at Nabble.com.
>
>


--

James
-------
http://radio.weblogs.com/0112098/