You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by Green <el...@163.com> on 2017/03/07 09:44:43 UTC

Message Priority test

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory; 

public class Program {

	public static final String BROKERURI = "tcp://127.0.0.1:61616";
	public static final String TOPIC = "TEST";
	
	public static final int HIGH = 7;
	public static final int ABOVELOW = 3;

	public static void main(String[] args) throws Exception {
		Program program = new Program();
		program.startListening();
		program.sendMessage();
	}

	public void sendMessage() throws JMSException {
		
		ConnectionFactory factory = new ActiveMQConnectionFactory(BROKERURI);
		Connection connection = factory.createConnection();
		connection.start();

		Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
		Destination destination = session.createQueue(TOPIC);
		MessageProducer producer = session.createProducer(destination);

		for (int l = 0; l < 10; l++) {
			if (l % 2 == 0) {
				producer.setPriority(HIGH);
			} else {
				producer.setPriority(ABOVELOW);
			}
			String text = String.format("message%s priority: %s",
l,producer.getPriority());
			producer.send(session.createTextMessage(text));
		}
		session.close();
		connection.close();
	}

	public void startListening() throws JMSException {

		ConnectionFactory factory = new ActiveMQConnectionFactory(BROKERURI);
		Connection connection = factory.createConnection();
		connection.start();

		Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
		Destination destination = session.createQueue(TOPIC);
		MessageConsumer consumer = session.createConsumer(destination);
		consumer.setMessageListener(new MessageListener() {
			public void onMessage(Message msg) {
				TextMessage message = (TextMessage) msg;
				try {
					System.out.println(message.getText());
				} catch (JMSException ex) {
					ex.printStackTrace();
				}
				try {
					Thread.sleep(100);
				} catch (InterruptedException ex) {
					ex.printStackTrace();
				}
			}
		});

	}

}
----------------------
java it is work
but C# it is not work
----------------------
 using Apache.NMS;
    using Apache.NMS.ActiveMQ;
    using System;
    using System.Threading;
    class Program {
        private const string BLORKURI = "tcp://127.0.0.1:61616";
        private const string TOPIC = "TEST"; 

        static void Main(string[] args) {
            Program program = new Program();
            program.startListening();
            program.sendMessage();
            Console.Read();
        }
        public void sendMessage() {
            IConnectionFactory factory = new ConnectionFactory(BLORKURI);
            IConnection connection = factory.CreateConnection();
            connection.Start();

            ISession session =
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
            IDestination destination = session.GetQueue(TOPIC);
            IMessageProducer producer = session.CreateProducer(destination);

            for (int l = 1; l <= 10; l++) {
                if (l % 2 == 0) { 
                    producer.Priority = Apache.NMS.MsgPriority.High;
                } else {
                    producer.Priority = Apache.NMS.MsgPriority.AboveLow; 
                }
                string text = string.Format("message{0} priority: {1}", l,
producer.Priority);
                producer.Send(session.CreateTextMessage(text));
            }
            session.Close();
            connection.Close();
        }
        public void startListening() {
            IConnectionFactory factory = new ConnectionFactory(BLORKURI);
            IConnection connection = factory.CreateConnection();
            connection.Start();

            ISession session =
connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
            IDestination destination = session.GetQueue(TOPIC);
            IMessageConsumer consumer = session.CreateConsumer(destination);

            consumer.Listener += new MessageListener(handleMessage);

        }
        protected virtual void handleMessage(Apache.NMS.IMessage message) {
            ITextMessage msg = message as ITextMessage;
            Console.WriteLine(msg.Text);
            Thread.Sleep(1000);
        } 
    }
 



--
View this message in context: http://activemq.2283324.n4.nabble.com/Message-Priority-test-tp4723158.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.

Re: Message Priority test

Posted by Timothy Bish <ta...@gmail.com>.
On 03/07/2017 09:00 PM, Green wrote:
> get it
>
> connection.MessagePrioritySupported = true
>
>
>
>
Just note that if you haven't enabled priority support on the broker 
side than this only applies priority ordering to messages that sit in 
the prefetch buffer, so if the buffer is not holding a backlog you won't 
see priority ordering from the arriving messages.


-- 
Tim Bish
twitter: @tabish121
blog: http://timbish.blogspot.com/


Re: Message Priority test

Posted by Green <el...@163.com>.
get it 

connection.MessagePrioritySupported = true



--
View this message in context: http://activemq.2283324.n4.nabble.com/Message-Priority-test-tp4723158p4723280.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.

Re: Message Priority test

Posted by Timothy Bish <ta...@gmail.com>.
Care to elaborate on what works or doesn't work means in your case?  
Have you enabled message priority support on the broker?

http://activemq.apache.org/how-can-i-support-priority-queues.html

Client side

Java
https://github.com/apache/activemq/blob/master/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java#L173

.NET
https://svn.apache.org/repos/asf/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/ConnectionFactory.cs

On 03/07/2017 04:44 AM, Green wrote:
> import javax.jms.Connection;
> import javax.jms.ConnectionFactory;
> import javax.jms.Destination;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
>
> public class Program {
>
> 	public static final String BROKERURI = "tcp://127.0.0.1:61616";
> 	public static final String TOPIC = "TEST";
> 	
> 	public static final int HIGH = 7;
> 	public static final int ABOVELOW = 3;
>
> 	public static void main(String[] args) throws Exception {
> 		Program program = new Program();
> 		program.startListening();
> 		program.sendMessage();
> 	}
>
> 	public void sendMessage() throws JMSException {
> 		
> 		ConnectionFactory factory = new ActiveMQConnectionFactory(BROKERURI);
> 		Connection connection = factory.createConnection();
> 		connection.start();
>
> 		Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> 		Destination destination = session.createQueue(TOPIC);
> 		MessageProducer producer = session.createProducer(destination);
>
> 		for (int l = 0; l < 10; l++) {
> 			if (l % 2 == 0) {
> 				producer.setPriority(HIGH);
> 			} else {
> 				producer.setPriority(ABOVELOW);
> 			}
> 			String text = String.format("message%s priority: %s",
> l,producer.getPriority());
> 			producer.send(session.createTextMessage(text));
> 		}
> 		session.close();
> 		connection.close();
> 	}
>
> 	public void startListening() throws JMSException {
>
> 		ConnectionFactory factory = new ActiveMQConnectionFactory(BROKERURI);
> 		Connection connection = factory.createConnection();
> 		connection.start();
>
> 		Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> 		Destination destination = session.createQueue(TOPIC);
> 		MessageConsumer consumer = session.createConsumer(destination);
> 		consumer.setMessageListener(new MessageListener() {
> 			public void onMessage(Message msg) {
> 				TextMessage message = (TextMessage) msg;
> 				try {
> 					System.out.println(message.getText());
> 				} catch (JMSException ex) {
> 					ex.printStackTrace();
> 				}
> 				try {
> 					Thread.sleep(100);
> 				} catch (InterruptedException ex) {
> 					ex.printStackTrace();
> 				}
> 			}
> 		});
>
> 	}
>
> }
> ----------------------
> java it is work
> but C# it is not work
> ----------------------
>   using Apache.NMS;
>      using Apache.NMS.ActiveMQ;
>      using System;
>      using System.Threading;
>      class Program {
>          private const string BLORKURI = "tcp://127.0.0.1:61616";
>          private const string TOPIC = "TEST";
>
>          static void Main(string[] args) {
>              Program program = new Program();
>              program.startListening();
>              program.sendMessage();
>              Console.Read();
>          }
>          public void sendMessage() {
>              IConnectionFactory factory = new ConnectionFactory(BLORKURI);
>              IConnection connection = factory.CreateConnection();
>              connection.Start();
>
>              ISession session =
> connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
>              IDestination destination = session.GetQueue(TOPIC);
>              IMessageProducer producer = session.CreateProducer(destination);
>
>              for (int l = 1; l <= 10; l++) {
>                  if (l % 2 == 0) {
>                      producer.Priority = Apache.NMS.MsgPriority.High;
>                  } else {
>                      producer.Priority = Apache.NMS.MsgPriority.AboveLow;
>                  }
>                  string text = string.Format("message{0} priority: {1}", l,
> producer.Priority);
>                  producer.Send(session.CreateTextMessage(text));
>              }
>              session.Close();
>              connection.Close();
>          }
>          public void startListening() {
>              IConnectionFactory factory = new ConnectionFactory(BLORKURI);
>              IConnection connection = factory.CreateConnection();
>              connection.Start();
>
>              ISession session =
> connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
>              IDestination destination = session.GetQueue(TOPIC);
>              IMessageConsumer consumer = session.CreateConsumer(destination);
>
>              consumer.Listener += new MessageListener(handleMessage);
>
>          }
>          protected virtual void handleMessage(Apache.NMS.IMessage message) {
>              ITextMessage msg = message as ITextMessage;
>              Console.WriteLine(msg.Text);
>              Thread.Sleep(1000);
>          }
>      }
>   
>
>
>
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/Message-Priority-test-tp4723158.html
> Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.
>


-- 
Tim Bish
twitter: @tabish121
blog: http://timbish.blogspot.com/