You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by letalis <b_...@hotmail.com> on 2009/06/17 14:14:56 UTC

Messages lost?

Hello,

I'm new to the activeMQ software, however i've worked with
sonicMQ(progress). 

My problem is the following:
Sometimes i have send a message 2times before it got noticed by the
receiver.

My data is the following:
-I write my software in C#. I have written a connection class within c#.
This class is used in both programs.
-I use queues. I use (for example) Application1.out, Application1.In,
Application2.Out & Application.2.In.
-I have 3 applications: 
App1 is using queue application1.In as send queue and application1.Out as
listen queue.
App2 is using queue application2.In as send queue and application2.Out as
listen queue.
App3 is the routing application. This application listens to all the .Out
queues, analyses the xml message then routes it to the correct .In queue.

The problem appears when i send a message from app1 to app2. I can see that
the message has been routed to app2 only sometimes the message is not
delivered.

Hopefully someone can view my code and see what i'm doing wrong. 

Kind reagards,

Letalis

Attatched is the source for app1 & app2 + the connector class.

<connector class>
public delegate void messageReceived(object o,
Connection.connection.receiveMessage e);

    public class connection
    {
        private const int CONNECTION_RETRY_PERIOD = 1000; // milliseconds (1
sec)
        private String DEFAULT_BROKER = "tcp://localhost:61616";
        private String DEFAULT_BROKER_ADDRESS = "localhost";
        private int DEFAULT_BROKER_PORT = 61616;
        private String DEFAULT_USERNAME = "Administrator";
        private String DEFAULT_PASSWORD = "Administrator";
        private int DEFAULT_TTL = 0; // milliseconds (0 = forever)

        private ConnectionFactory factory;
        private IConnection connector;

        public event messageReceived incommingMessage;
        private static logging log;
        private IMessageProducer sender;
        private ISession sendSession;
        private ISession receiveSession;

        private static string _rQueue;
        private static string _sQueue;
        private static string _targetListener;
        private static string _serviceName;

        private bool connectionDropped = false;


        public connection(string serviceName, bool loggingEnabled)
        {
            _serviceName = serviceName;
            log = new logging(_serviceName, loggingEnabled);
        }
        #region Getters & setters
        public string serverAddres
        {
            get { return DEFAULT_BROKER_ADDRESS; }
            set
            {
                DEFAULT_BROKER_ADDRESS = value;
                DEFAULT_BROKER = "tcp://"+ DEFAULT_BROKER_ADDRESS + ":" +
DEFAULT_BROKER_PORT;
            }
        }

        public int serverPort
        {
            get { return DEFAULT_BROKER_PORT; }
            set
            {
                DEFAULT_BROKER_PORT = value;
                DEFAULT_BROKER = "tcp://" + DEFAULT_BROKER_ADDRESS + ":" +
DEFAULT_BROKER_PORT;
            }
        }

        public string username
        {
            get { return DEFAULT_USERNAME; }
            set { DEFAULT_USERNAME = value; }
        }

        public string password
        {
            get { return DEFAULT_PASSWORD; }
            set { DEFAULT_PASSWORD = value; }
        }

        public int timeout
        {
            get { return DEFAULT_TTL; }
            set { DEFAULT_TTL = value; }
        }
        #endregion

        public bool connectQueues(string rQueue, string sQueue, string
target)
        {
            _rQueue = rQueue;
            _sQueue = sQueue;

            // Create a connection.
            lock (this)
            {
                factory = null;
                while (factory == null)
                {
                    try
                    {
                        log.addToLogFile("Attempting to create connection
with " + DEFAULT_BROKER, logging.logtype.info);

                        factory = new ConnectionFactory(DEFAULT_BROKER);
                        connector =
factory.CreateConnection(DEFAULT_USERNAME, DEFAULT_PASSWORD);
                        log.addToLogFile("Connection with " + DEFAULT_BROKER
+ " created.", logging.logtype.info);
                    }
                    catch (Exception ex)
                    {
                        log.addToLogFile(ex.ToString(),
logging.logtype.error);
                        Console.WriteLine(ex.ToString(),
logging.logtype.error);

                        try
                        {
                            Thread.Sleep(CONNECTION_RETRY_PERIOD);
                        }
                        catch (ThreadInterruptedException)
                        {

                        }
                        continue;
                    }

                    // Create sessions, sender and receiver
                    try
                    {
                        sendSession =
connector.CreateSession(AcknowledgementMode.ClientAcknowledge);
                        if (sQueue != null)
                        {
                            IQueue sendQ = sendSession.GetQueue(_sQueue);
                            sender = sendSession.CreateProducer(sendQ);
                        }

                        receiveSession =
connector.CreateSession(AcknowledgementMode.ClientAcknowledge);
                        if (rQueue != null)
                        {

                            IQueue receiveQ =
receiveSession.GetQueue(_rQueue);
                            IMessageConsumer queueReceiver;


                            if (target != null)
                            {
                                //string t = "target \'" + target + "\'";
                                queueReceiver =
receiveSession.CreateConsumer(receiveQ, "target" + target);
                            }
                            else
                            {
                                queueReceiver =
receiveSession.CreateConsumer(receiveQ);
                            }
                            queueReceiver.Listener += new
MessageListener(queueReceiver_Listener);
                        }
                    }
                    catch (Exception ex)//(Sonic.Jms.JMSException ex)
                    {
                        Console.WriteLine(ex.ToString());
                        return false;
                    }


                    // Start connection
                    try
                    {
                        connector.Start();
                        return true;
                    }
                    catch
                    {
                        return false;
                    }

                }
                return true;
            }
        }

        void queueReceiver_Listener(IMessage message)
        {
            message.Acknowledge();
            receiveMessage res = new receiveMessage(message);
            onMessageReceived(res);
        }

        public void disconnect()
        {
            try
            {
                sendSession.Close();
            }
            catch { }
            try
            {
                receiveSession.Close();
            }
            catch { }
            try
            {
                sender.Close();
            }
            catch { }
            try
            {
                connector.Close();
            }
            catch { }
        }

        public void sendMessage(string message, string target, int ttl, bool
persistent, bool preserve)
        {
            IMessage msg = sendSession.CreateTextMessage(message);

            if (target != null)
            {
                msg.Properties.SetString("target", target);
            }

            TimeSpan ts = new TimeSpan();
            if (ttl > 0)
            {
                ts = new TimeSpan(ttl * TimeSpan.TicksPerMillisecond);

                msg.NMSTimeToLive = ts;
            }

            sender.Send(msg, persistent, 0x0, ts);
        }

        public class receiveMessage : EventArgs
        {
            public readonly IMessage message;

            public receiveMessage(IMessage Message)
            {
                message = Message;
            }
        }

        public void onMessageReceived(receiveMessage e)
        {
            if (incommingMessage != null)
                incommingMessage(new object(), e);
        }

        public string getMessageBodyFromMessage(IMessage message)
        {
            ITextMessage textMessage = (ITextMessage)message;
            return textMessage.Text;
        }

        public string getMessagePropperty(IMessage message, string target)
        {
            return message.Properties.GetString(target);
        }

        public string getTargetFormat(string target)
        {
            return " = \'" + target + "\'";
        }

        public void writeToLog(string logMessage, logging.logtype logType)
        {
            log.addToLogFile(logMessage, logType);
        }

    }
</connector class>

<application 1&2 >
In the application 1&2 I do basicly the same:
-Initialise the conneciton class
-use the connectQueues() function to connect the queues
-initialise the listener.

For sending the message I use the sendmessage function, i have the same
target in sending & receiving.
</application 1&2 >

-- 
View this message in context: http://www.nabble.com/Messages-lost--tp24072091p24072091.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.