You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by mbmerr <mb...@nmi.net> on 2006/06/02 18:14:06 UTC

Multiple sessions on single connection not supported?

I'm trying to create multiple sessions on a single connection, and perform
distinct .commit() commands on them.  I'm not getting the results I expect.

Basically, I'd like to have the following messages come in to the queue:

Message 1
Message 2
Message 3

I want to commit messages 1 and 3, but not 2.  I do this by opening distinct
sessions for each message.  When I close the connection, Message 2 should be
rolled back, since it was never committed.  But I don't get those results. 
I've tested this with ActiveMQ 4.0 and SNAPSHOT as of 2006-06-02.

And I get different results depending on whether I read the messages with
.receiveNoWait() vs. .receive(), and which version of ActiveMQ I'm using.

Any thoughts?  The sample code gives the expected results under JORAM
4.3.18.  Thanks for your time.


EXPECTED RESULTS 

     [java] Opening connection for writes
     [java] Sending message 1
     [java] Sending message 2
     [java] Sending message 3
     [java] Closing connection for writes
     [java] Opening connection for reads
     [java] Opening session for read
     [java] Polling for message...
     [java] Received message with text [Message 1]
     [java] Committing session for message [Message 1]
     [java] Opening session for read
     [java] Polling for message...
     [java] Received message with text [Message 2]
     [java] Performing no action against session for message [Message 2]
     [java] Opening session for read
     [java] Polling for message...
     [java] Received message with text [Message 3]
     [java] Committing session for message [Message 3]
     [java] Closing connection for reads - should force rollback of any
uncommitted session
     [java] Sends and receives complete - now emptying queue of remaining
messages
     [java] Opening connection for read
     [java] Looking for lingering message...
     [java] Found message on queue [Message 2]
     [java] Looking for lingering message...
     [java] No more lingering messages
     [java] Closing connection for read
     [java] All work completed

ACTUAL RESULTS (4.0, using .receiveNoWait())

     [java] Opening connection for writes
     [java] Sending message 1
     [java] Sending message 2
     [java] Sending message 3
     [java] Closing connection for writes
     [java] Opening connection for reads
     [java] Opening session for read
     [java] Polling for message...
     [java] Received message with text [Message 1]
     [java] Committing session for message [Message 1]
     [java] Opening session for read
     [java] Polling for message...
     [java] Opening session for read
     [java] Polling for message...
     [java] Closing connection for reads - should force rollback of any
uncommitted session
     [java] Sends and receives complete - now emptying queue of remaining
messages
     [java] Opening connection for read
     [java] Looking for lingering message...
     [java] Found message on queue [Message 2]
     [java] Looking for lingering message...
     [java] Found message on queue [Message 3]
     [java] Looking for lingering message...
     [java] No more lingering messages
     [java] Closing connection for read
     [java] All work completed

ACTUAL RESULTS (SNAPSHOT, using .receiveNoWait())

     [java] Opening connection for writes
     [java] Sending message 1
     [java] Sending message 2
     [java] Sending message 3
     [java] Closing connection for writes
     [java] Opening connection for reads
     [java] Opening session for read
     [java] Polling for message...
     [java] Received message with text [Message 1]
     [java] Committing session for message [Message 1]
     [java] Opening session for read
     [java] Polling for message...
     [java] Opening session for read
     [java] Polling for message...
     [java] Closing connection for reads - should force rollback of any
uncommitted session
     [java] Sends and receives complete - now emptying queue of remaining
messages
     [java] Opening connection for read
     [java] Looking for lingering message...
     [java] No more lingering messages
     [java] Closing connection for read
     [java] All work completed

ACTUAL RESULTS (4.0 and SNAPSHOT, using .receive())

     [java] Opening connection for writes
     [java] Sending message 1
     [java] Sending message 2
     [java] Sending message 3
     [java] Closing connection for writes
     [java] Opening connection for reads
     [java] Opening session for read
     [java] Polling for message...
     [java] Received message with text [Message 1]
     [java] Committing session for message [Message 1]
     [java] Opening session for read
     [java] Polling for message...
[code hangs here]


TEST CODE

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Hashtable;

public class MultipleSessionTest {

    private static final String DESTNAME = "MULTISESSIONQUEUETEST";

    private static final int TOTAL_MESSAGES = 3;
    private static final int[] MSGS_TO_COMMIT = new int[]{ 1, 3 };

    private Context context;
    private ConnectionFactory factory;

    public static void main(String[] args) throws JMSException,
NamingException {

        MultipleSessionTest mst = new MultipleSessionTest();

        // stock the queue with 3 messages
        mst.sendMessages(TOTAL_MESSAGES);

        // process the messages, doing a session.commit() on just messages 1
and 3
        mst.receiveMessages(TOTAL_MESSAGES, MSGS_TO_COMMIT);

        // destination should now contain just those messages that were not
explicitly committed - list them
        mst.emptyDestination();

        System.out.println("All work completed");

    }

    public MultipleSessionTest() throws NamingException {

        Hashtable properties = new Hashtable();

        // ActiveMQ
        properties.put(Context.INITIAL_CONTEXT_FACTORY,
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        properties.put(Context.PROVIDER_URL, "tcp://localhost:61616");
        // OpenJMS
//        properties.put(Context.INITIAL_CONTEXT_FACTORY,
"org.exolab.jms.jndi.InitialContextFactory");
//        properties.put(Context.PROVIDER_URL, "tcp://localhost:3035/");
        // JORAM
//        properties.put(Context.INITIAL_CONTEXT_FACTORY,
"fr.dyade.aaa.jndi2.client.NamingContextFactory");
//       
properties.put(fr.dyade.aaa.jndi2.client.NamingContextFactory.JAVA_HOST_PROPERTY,
"localhost");
//       
properties.put(fr.dyade.aaa.jndi2.client.NamingContextFactory.JAVA_PORT_PROPERTY,
"16400");

        context = new InitialContext(properties);
        // ActiveMQ
        factory = (ConnectionFactory) context.lookup("ConnectionFactory");
        // OpenJMS
//        factory = (ConnectionFactory) context.lookup("ConnectionFactory");
        // JORAM
//        factory = (ConnectionFactory) context.lookup("qcf");

    }

    private void sendMessages(int totalMessages) throws JMSException,
NamingException {

        // open connection to JMS provider
        System.out.println("Opening connection for writes");
        Connection connection = factory.createConnection();

        // open session on connection (true for transacted session)
        Session session = connection.createSession(true, 0);

        // create a destination
        Destination destination = getDestination(session);

        // create a producer on the session for the destination
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        // send messages via the producer to the destination
        for (int i = 1; i <= totalMessages; i++) {
            sendMessage(session, producer, i);
        }

        // close everything (closing connection handles other closes)
        System.out.println("Closing connection for writes");
        connection.close();

    }

    private void sendMessage(Session session, MessageProducer producer, int
num) throws JMSException {

        System.out.println("Sending message " + num);
        TextMessage message = session.createTextMessage("Message " + num);
        producer.send(message);
        session.commit();

    }

    private void receiveMessages(int totalMessages, int[] commitList) throws
JMSException, NamingException {

        // open connection to JMS server
        Connection connection = factory.createConnection();
        System.out.println("Opening connection for reads");

        // receive messages, committing them if requested
        for (int i = 1; i <= totalMessages; i++) {
            receiveMessage(connection, commitList);
        }

        // close the connection - should force rollback for any messages not
on commitList
        System.out.println("Closing connection for reads - should force
rollback of any uncommitted session");
        connection.close();

    }

    private void receiveMessage(Connection connection, int[] commitList)
throws JMSException, NamingException {

        // open session on connection (true for transacted session)
        System.out.println("Opening session for read");
        Session session = connection.createSession(true, 0);

        // create a destination
        Destination destination = getDestination(session);

        // create a consumer on the session for the destination
        MessageConsumer consumer = session.createConsumer(destination);

        // start the connection (allows message delivery to occur)
        connection.start();

        System.out.println("Polling for message...");
        // NOTE: different behavior when .receive() vs. .receiveNoWait()?
        Message message = consumer.receiveNoWait();

        if (message != null) {

            if (message instanceof TextMessage) {

                TextMessage textMessage = (TextMessage) message;
                String s = textMessage.getText();
                System.out.println("Received message with text [" + s +
"]");

                boolean committed = false;
                for (int i = 0; i < commitList.length; i++) {
                    if (s.indexOf(commitList[i] + "") != -1) {
                        System.out.println("Committing session for message
[" + s + "]");
                        session.commit();
                        committed = true;
                        break;
                    }
                }

                if (!committed) {
                    System.out.println("Performing no action against session
for message [" + s + "]");
                }

            } else {
                System.err.println("Received message of unexpected type [" +
message + "]");
            }

        }

        // note we never close sessions

    }

    private void emptyDestination() throws JMSException, NamingException {

        System.out.println("Sends and receives complete - now emptying queue
of remaining messages");

        // open connection to JMS server
        System.out.println("Opening connection for read");
        Connection connection = factory.createConnection();

        // open session on connection (true for transacted session)
        Session session = connection.createSession(true, 0);

        // create a destination
        Destination destination = getDestination(session);

        // create a consumer on the session for the destination
        MessageConsumer consumer = session.createConsumer(destination);

        // start the connection (allows message delivery to occur)
        connection.start();

        while (true) {
            System.out.println("Looking for lingering message...");
            Message message = consumer.receiveNoWait();
            if (message != null) {
                if (message instanceof TextMessage) {
                    System.out.println("Found message on queue [" +
((TextMessage) message).getText() + "]");
                } else {
                    System.out.println("Found message on server of
unexpected type [" + message + "]");
                }
            } else {
                System.out.println("No more lingering messages");
                break;
            }
        }

        System.out.println("Closing connection for read");
        session.commit();
        connection.close();

    }

    private Destination getDestination(Session session) throws
NamingException, JMSException {

        // ActiveMQ
        return session.createQueue(DESTNAME);
        // OpenJMS
//        return session.createQueue(DESTNAME);
        // JORAM
//        return (Queue) context.lookup("queue");

    }

}

--
View this message in context: http://www.nabble.com/Multiple-sessions-on-single-connection-not-supported--t1723640.html#a4682596
Sent from the ActiveMQ - User forum at Nabble.com.