You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2009/10/05 17:05:46 UTC

svn commit: r821828 - in /qpid/trunk/qpid/java: systests/src/main/java/org/apache/qpid/test/unit/publish/ systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java test-profiles/Excludes

Author: ritchiem
Date: Mon Oct  5 15:05:45 2009
New Revision: 821828

URL: http://svn.apache.org/viewvc?rev=821828&view=rev
Log:
QPID-1816 : Add testing for publication on a dirty session after failover.

Added:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java
Modified:
    qpid/trunk/qpid/java/test-profiles/Excludes

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java?rev=821828&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTrasactedPubilshTest.java Mon Oct  5 15:05:45 2009
@@ -0,0 +1,403 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.test.unit.publish;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.test.utils.FailoverBaseCase;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TransactionRolledBackException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * QPID-1816 : Whilst testing Acknoledgement after failover this completes testing
+ * of the client after failover. When we have a dirty session we should receive
+ * an error if we attempt to publish. This test ensures that both in the synchronous
+ * and asynchronous message delivery paths we receive the expected exceptions at
+ * the expected time.
+ */
+public class DirtyTrasactedPubilshTest extends FailoverBaseCase implements ConnectionListener
+{
+    protected CountDownLatch _failoverCompleted = new CountDownLatch(1);
+
+    protected int NUM_MESSAGES;
+    protected Connection _connection;
+    protected Queue _queue;
+    protected Session _consumerSession;
+    protected MessageConsumer _consumer;
+    protected MessageProducer _producer;
+
+    private static final String MSG = "MSG";
+    private static final String SEND_FROM_ON_MESSAGE_TEXT = "sendFromOnMessage";
+    protected CountDownLatch _receviedAll;
+    protected AtomicReference<Exception> _causeOfFailure = new AtomicReference<Exception>(null);
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        NUM_MESSAGES = 10;
+
+        _queue = getTestQueue();
+
+        //Create Producer put some messages on the queue
+        _connection = getConnection();
+    }
+
+    /**
+     * Initialise the test variables
+     * @param transacted is this a transacted test
+     * @param mode if not trasacted then what ack mode to use
+     * @throws Exception if there is a setup issue.
+     */
+    protected void init(boolean transacted, int mode) throws Exception
+    {
+        _consumerSession = _connection.createSession(transacted, mode);
+        _consumer = _consumerSession.createConsumer(_queue);
+        _producer = _consumerSession.createProducer(_queue);
+
+        // These should all end up being prefetched by session
+        sendMessage(_consumerSession, _queue, 1);
+
+        assertEquals("Wrong number of messages on queue", 1,
+                     ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+    }
+
+    /**
+     * If a transacted session has failed over whilst it has uncommitted sent
+     * data then we need to throw a TransactedRolledbackException on commit()
+     *
+     * The alternative would be to maintain a replay buffer so that the message
+     * could be resent. This is not currently implemented
+     *
+     * @throws Exception if something goes wrong.
+     */
+    public void testDirtySendingSynchronousTransacted() throws Exception
+    {
+        Session producerSession = _connection.createSession(true, Session.SESSION_TRANSACTED);
+
+        // Ensure we get failover notifications
+        ((AMQConnection) _connection).setConnectionListener(this);
+
+        MessageProducer producer = producerSession.createProducer(_queue);
+
+        // Create and send message 0
+        Message msg = producerSession.createMessage();
+        msg.setIntProperty(INDEX, 0);
+        producer.send(msg);
+
+        // DON'T commit message .. fail connection
+
+        failBroker(getFailingPort());
+
+        // Ensure destination exists for sending
+        producerSession.createConsumer(_queue).close();
+
+        // Send the next message
+        msg.setIntProperty(INDEX, 1);
+        try
+        {
+            producer.send(msg);
+            fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException.");
+        }
+        catch (JMSException jmse)
+        {
+            assertEquals("Early warning of dirty session not correct",
+                         "Failover has occurred and session is dirty so unable to send.", jmse.getMessage());
+        }
+
+        // Ignore that the session is dirty and attempt to commit to validate the
+        // exception is thrown. AND that the above failure notification did NOT
+        // clean up the session.
+
+        try
+        {
+            producerSession.commit();
+            fail("Session is dirty we should get an TransactionRolledBackException");
+        }
+        catch (TransactionRolledBackException trbe)
+        {
+            // Normal path.
+        }
+
+        // Resending of messages should now work ok as the commit was forcilbly rolledback
+        msg.setIntProperty(INDEX, 0);
+        producer.send(msg);
+        msg.setIntProperty(INDEX, 1);
+        producer.send(msg);
+
+        producerSession.commit();
+
+        assertEquals("Wrong number of messages on queue", 2,
+                     ((AMQSession) producerSession).getQueueDepth((AMQDestination) _queue));
+    }
+
+    /**
+     * If a transacted session has failed over whilst it has uncommitted sent
+     * data then we need to throw a TransactedRolledbackException on commit()
+     *
+     * The alternative would be to maintain a replay buffer so that the message
+     * could be resent. This is not currently implemented
+     *
+     * @throws Exception if something goes wrong.
+     */
+    public void testDirtySendingOnMessageTransacted() throws Exception
+    {
+        NUM_MESSAGES = 1;
+        _receviedAll = new CountDownLatch(NUM_MESSAGES);
+        ((AMQConnection) _connection).setConnectionListener(this);
+
+        init(true, Session.SESSION_TRANSACTED);
+
+        _consumer.setMessageListener(new MessageListener()
+        {
+
+            public void onMessage(Message message)
+            {
+                try
+                {
+                    // Create and send message 0
+                    Message msg = _consumerSession.createMessage();
+                    msg.setIntProperty(INDEX, 0);
+                    _producer.send(msg);
+
+                    // DON'T commit message .. fail connection
+
+                    failBroker(getFailingPort());
+
+                    // rep
+                    repopulateBroker();
+
+                    // Destination will exist as this failBroker will populate
+                    // the queue with 1 message
+
+                    // Send the next message
+                    msg.setIntProperty(INDEX, 1);
+                    try
+                    {
+                        _producer.send(msg);
+                        fail("Should fail with Qpid as we provide early warning of the dirty session via a JMSException.");
+                    }
+                    catch (JMSException jmse)
+                    {
+                        assertEquals("Early warning of dirty session not correct",
+                                     "Failover has occurred and session is dirty so unable to send.", jmse.getMessage());
+                    }
+
+                    // Ignore that the session is dirty and attempt to commit to validate the
+                    // exception is thrown. AND that the above failure notification did NOT
+                    // clean up the session.
+
+                    try
+                    {
+                        _consumerSession.commit();
+                        fail("Session is dirty we should get an TransactionRolledBackException");
+                    }
+                    catch (TransactionRolledBackException trbe)
+                    {
+                        // Normal path.
+                    }
+
+                    // Resend messages
+                    msg.setIntProperty(INDEX, 0);
+                    msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT);
+                    _producer.send(msg);
+                    msg.setIntProperty(INDEX, 1);
+                    msg.setStringProperty(MSG, SEND_FROM_ON_MESSAGE_TEXT);
+                    _producer.send(msg);
+
+                    _consumerSession.commit();
+
+                    // Stop this consumer .. can't do _consumer.stop == DEADLOCK
+                    // this doesn't seem to stop dispatcher running
+                    _connection.stop();
+
+                    // Signal that the onMessage send part of test is complete
+                    // main thread can validate that messages are correct
+                    _receviedAll.countDown();
+
+                }
+                catch (Exception e)
+                {
+                    fail(e);
+                }
+
+            }
+
+        });
+
+        _connection.start();
+
+        if (!_receviedAll.await(10000L, TimeUnit.MILLISECONDS))
+        {
+            // Check to see if we ended due to an exception in the onMessage handler
+            Exception cause = _causeOfFailure.get();
+            if (cause != null)
+            {
+                cause.printStackTrace();
+                fail(cause.getMessage());
+            }
+            else
+            {
+                fail("All messages not received:" + _receviedAll.getCount() + "/" + NUM_MESSAGES);
+            }
+        }
+
+        // Check to see if we ended due to an exception in the onMessage handler
+        Exception cause = _causeOfFailure.get();
+        if (cause != null)
+        {
+            cause.printStackTrace();
+            fail(cause.getMessage());
+        }
+
+        _consumer.close();
+        _consumerSession.close();
+
+        _consumerSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        _connection.start();
+
+        // Validate that we could send the messages as expected.
+        assertEquals("Wrong number of messages on queue", 3,
+                     ((AMQSession) _consumerSession).getQueueDepth((AMQDestination) _queue));
+
+        MessageConsumer consumer = _consumerSession.createConsumer(_queue);
+
+        //Validate the message sent to setup the failed over broker.
+        Message message = consumer.receive(1000);
+        assertNotNull("Message " + 0 + " not received.", message);
+        assertEquals("Incorrect message received", 0, message.getIntProperty(INDEX));
+
+        // Validate the two messages sent from within the onMessage
+        for (int index = 0; index <= 1; index++)
+        {
+            message = consumer.receive(1000);
+            assertNotNull("Message " + index + " not received.", message);
+            assertEquals("Incorrect message received", index, message.getIntProperty(INDEX));
+            assertEquals("Incorrect message text for message:" + index, SEND_FROM_ON_MESSAGE_TEXT, message.getStringProperty(MSG));
+        }
+
+        assertNull("Extra message received.", consumer.receiveNoWait());
+
+        _consumerSession.close();
+
+        assertEquals("Wrong number of messages on queue", 0,
+                     ((AMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE)).getQueueDepth((AMQDestination) _queue));
+    }
+
+    private void repopulateBroker() throws Exception
+    {
+        // Repopulate this new broker so we can test what happends after failover
+
+        //Get the connection to the first (main port) broker.
+        Connection connection = getConnection();
+        // Use a transaction to send messages so we can be sure they arrive.
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        // ensure destination is created.
+        session.createConsumer(_queue).close();
+
+        sendMessage(session, _queue, NUM_MESSAGES);
+
+        assertEquals("Wrong number of messages on queue", NUM_MESSAGES,
+                     ((AMQSession) session).getQueueDepth((AMQDestination) _queue));
+
+        connection.close();
+    }
+
+    // AMQConnectionListener Interface.. used so we can validate that we
+    // actually failed over.
+
+    public void bytesSent(long count)
+    {
+    }
+
+    public void bytesReceived(long count)
+    {
+    }
+
+    public boolean preFailover(boolean redirect)
+    {
+        //Allow failover
+        return true;
+    }
+
+    public boolean preResubscribe()
+    {
+        //Allow failover
+        return true;
+    }
+
+    public void failoverComplete()
+    {
+        _failoverCompleted.countDown();
+    }
+
+    /**
+     * Override so we can block until failover has completd
+     *
+     * @param port int the port of the broker to fail.
+     */
+    @Override
+    public void failBroker(int port)
+    {
+        super.failBroker(port);
+
+        try
+        {
+            if (!_failoverCompleted.await(DEFAULT_FAILOVER_TIME, TimeUnit.MILLISECONDS))
+            {
+                fail("Failover did not occur in specified time:" + DEFAULT_FAILOVER_TIME);
+            }
+        }
+        catch (InterruptedException e)
+        {
+            fail("Failover was interuppted");
+        }
+    }
+
+    /**
+     * Pass the given exception back to the waiting thread to fail the test run.
+     *
+     * @param e The exception that is causing the test to fail.
+     */
+    protected void fail(Exception e)
+    {
+        _causeOfFailure.set(e);
+        // End the test.
+        while (_receviedAll.getCount() != 0)
+        {
+            _receviedAll.countDown();
+        }
+    }
+}

Modified: qpid/trunk/qpid/java/test-profiles/Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=821828&r1=821827&r2=821828&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Excludes Mon Oct  5 15:05:45 2009
@@ -19,3 +19,9 @@
 
 // QPID-2081 :The configuration changes are now highlighting the close race condition
 org.apache.qpid.server.security.acl.SimpleACLTest#*
+
+// QPID-1816 : Client Ack has not been addressed
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testDirtyClientAck
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#testClientAck
+org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#testClientAck
+// testDirtyClientAck is ok as it fails as expected.. problem is it always fails :)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org