You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/11/29 17:19:24 UTC

[1/2] qpid-broker-j git commit: QPID-6933: [System Tests] Move RecoverTest into JMS 1.1 system tests

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 1298c8914 -> 37b6c9dbc


QPID-6933: [System Tests] Move RecoverTest into JMS 1.1 system tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2c9ce95d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2c9ce95d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2c9ce95d

Branch: refs/heads/master
Commit: 2c9ce95dd90f4e1bcc920a6bc9ab2ccbfadb5859
Parents: 1298c89
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Nov 29 16:12:07 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Nov 29 16:12:07 2017 +0000

----------------------------------------------------------------------
 .../jms_1_1/acknowledge/RecoverTest.java        | 243 +++++++++++++++++++
 .../jms_1_1/message/StreamMessageTest.java      |   2 -
 .../apache/qpid/test/unit/ack/RecoverTest.java  | 210 +---------------
 3 files changed, 246 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2c9ce95d/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/RecoverTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/RecoverTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/RecoverTest.java
new file mode 100644
index 0000000..cc41859
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/RecoverTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systests.jms_1_1.acknowledge;
+
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.systests.JmsTestBase;
+import org.apache.qpid.systests.Utils;
+
+public class RecoverTest extends JmsTestBase
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(RecoverTest.class);
+    private static final int SENT_COUNT = 4;
+
+    @Test
+    public void testRecoverForClientAcknowledge() throws Exception
+    {
+        Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        try
+        {
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(queue);
+            produceTestMessages(connection, queue, SENT_COUNT);
+            connection.start();
+
+            Message message = receiveAndValidateMessage(consumer, 0);
+            message.acknowledge();
+
+            receiveAndValidateMessage(consumer, 1);
+            receiveAndValidateMessage(consumer, 2);
+            session.recover();
+
+            receiveAndValidateMessage(consumer, 1);
+            receiveAndValidateMessage(consumer, 2);
+            receiveAndValidateMessage(consumer, 3);
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    @Test
+    public void testMessageOrderForClientAcknowledge() throws Exception
+    {
+        Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        try
+        {
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(queue);
+            produceTestMessages(connection, queue, SENT_COUNT);
+            connection.start();
+
+            int messageSeen = 0;
+            int expectedIndex = 0;
+            while (expectedIndex < SENT_COUNT)
+            {
+                Message message = receiveAndValidateMessage(consumer, expectedIndex);
+
+                //don't ack the message until we receive it 5 times
+                if (messageSeen < 5)
+                {
+                    LOGGER.debug(String.format("Recovering message with index %d", expectedIndex));
+                    session.recover();
+                    messageSeen++;
+                }
+                else
+                {
+                    LOGGER.debug(String.format("Acknowledging message with index %d", expectedIndex));
+                    messageSeen = 0;
+                    expectedIndex++;
+                    message.acknowledge();
+                }
+            }
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    @Test
+    public void testAcknowledgePerConsumer() throws Exception
+    {
+        Queue queue1 = createQueue("Q1");
+        Queue queue2 = createQueue("Q2");
+
+        Connection consumerConnection = getConnection();
+        try
+        {
+            Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            MessageConsumer consumer1 = consumerSession.createConsumer(queue1);
+            MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
+
+            Connection producerConnection = getConnection();
+            try
+            {
+                Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                MessageProducer producer1 = producerSession.createProducer(queue1);
+                MessageProducer producer2 = producerSession.createProducer(queue2);
+
+                producer1.send(producerSession.createTextMessage("msg1"));
+                producer2.send(producerSession.createTextMessage("msg2"));
+            }
+            finally
+            {
+                producerConnection.close();
+            }
+            consumerConnection.start();
+
+            TextMessage message2 = (TextMessage) consumer2.receive(getReceiveTimeout());
+            assertNotNull(message2);
+            assertEquals("msg2", message2.getText());
+
+            message2.acknowledge();
+            consumerSession.recover();
+
+            TextMessage message1 = (TextMessage) consumer1.receive(getReceiveTimeout());
+            assertNotNull(message1);
+            assertEquals("msg1", message1.getText());
+        }
+        finally
+        {
+            consumerConnection.close();
+        }
+    }
+
+    @Test
+    public void testRecoverInAutoAckListener() throws Exception
+    {
+        Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        try
+        {
+            Session consumerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer consumer = consumerSession.createConsumer(queue);
+            produceTestMessages(connection, queue, 1);
+
+            final CountDownLatch awaitMessages = new CountDownLatch(2);
+            final AtomicReference<Throwable> listenerCaughtException = new AtomicReference<>();
+            final AtomicInteger deliveryCounter = new AtomicInteger();
+            consumer.setMessageListener(message -> {
+                try
+                {
+                    deliveryCounter.incrementAndGet();
+                    assertEquals("Unexpected JMSRedelivered", deliveryCounter.get() > 1, message.getJMSRedelivered());
+                    if (deliveryCounter.get() == 1)
+                    {
+                        consumerSession.recover();
+                    }
+                }
+                catch (Throwable e)
+                {
+                    LOGGER.error("Unexpected failure on message receiving", e);
+                    listenerCaughtException.set(e);
+                }
+                finally
+                {
+                    awaitMessages.countDown();
+                }
+            });
+
+            connection.start();
+
+            assertTrue("Message is not received in timely manner",
+                       awaitMessages.await(getReceiveTimeout() * 2, TimeUnit.MILLISECONDS));
+            assertEquals("Message not received the correct number of times.",
+                         2, deliveryCounter.get());
+            assertNull("No exception should be caught by listener : " + listenerCaughtException.get(),
+                       listenerCaughtException.get());
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    private Message receiveAndValidateMessage(final MessageConsumer consumer,
+                                              final int messageIndex)
+            throws JMSException
+    {
+        Message message = consumer.receive(getReceiveTimeout());
+        assertNotNull(String.format("Expected message '%d' is not received", messageIndex), message);
+        assertEquals("Received message out of order", messageIndex, message.getIntProperty(INDEX));
+        return message;
+    }
+
+    private void produceTestMessages(final Connection connection, final Queue queue, final int messageNumber)
+            throws Exception
+    {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        try
+        {
+            Utils.sendMessage(session, queue, messageNumber);
+        }
+        finally
+        {
+            session.close();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2c9ce95d/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/StreamMessageTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/StreamMessageTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/StreamMessageTest.java
index 625f372..07b8926 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/StreamMessageTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/message/StreamMessageTest.java
@@ -32,14 +32,12 @@ import javax.jms.Connection;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageEOFException;
-import javax.jms.MessageListener;
 import javax.jms.MessageNotWriteableException;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.StreamMessage;
 
-import junit.framework.TestCase;
 import org.junit.Test;
 
 import org.apache.qpid.systests.JmsTestBase;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2c9ce95d/systests/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java b/systests/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
index 7af7b29..06200ea 100644
--- a/systests/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
+++ b/systests/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
@@ -20,7 +20,6 @@
 package org.apache.qpid.test.unit.ack;
 
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -28,10 +27,8 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
-import javax.jms.TextMessage;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +36,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
 
+/**
+ * Legacy JMS client specific tests
+ */
 public class RecoverTest extends QpidBrokerTestCase
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(RecoverTest.class);
@@ -46,7 +46,6 @@ public class RecoverTest extends QpidBrokerTestCase
     private static final int SENT_COUNT = 4;
 
     private volatile Exception _error;
-    private AtomicInteger _count;
     private long _timeout;
     private Connection _connection;
     private Session _consumerSession;
@@ -57,7 +56,6 @@ public class RecoverTest extends QpidBrokerTestCase
     {
         super.setUp();
         _error = null;
-        _count = new AtomicInteger();
         _timeout = getReceiveTimeout();
     }
 
@@ -116,25 +114,6 @@ public class RecoverTest extends QpidBrokerTestCase
         LOGGER.info("No messages redelivered as is expected");
     }
 
-    public void testRecoverResendsMsgs() throws Exception
-    {
-        initTest();
-
-        Message message = validateNextMessages(1, 0);
-        message.acknowledge();
-        LOGGER.info("Received and acknowledged first message");
-
-        _consumer.receive();
-        _consumer.receive();
-        _consumer.receive();
-        LOGGER.info("Received all four messages. Calling recover with three outstanding messages");
-        // no ack for last three messages so when I call recover I expect to get three messages back
-
-        _consumerSession.recover();
-
-        validateRemainingMessages(3);
-    }
-
     public void testRecoverResendsMsgsAckOnEarlier() throws Exception
     {
         initTest();
@@ -173,189 +152,6 @@ public class RecoverTest extends QpidBrokerTestCase
         validateRemainingMessages(0);
     }
 
-    public void testAcknowledgePerConsumer() throws Exception
-    {
-        Connection con = getConnection();
-
-        Session consumerSession = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Queue queue = createTestQueue(consumerSession, "Q1");
-        Queue queue2 = createTestQueue(consumerSession, "Q2");
-        MessageConsumer consumer = consumerSession.createConsumer(queue);
-        MessageConsumer consumer2 = consumerSession.createConsumer(queue2);
-
-        Connection con2 = getConnection();
-        Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        MessageProducer producer = producerSession.createProducer(queue);
-        MessageProducer producer2 = producerSession.createProducer(queue2);
-
-        producer.send(producerSession.createTextMessage("msg1"));
-        producer2.send(producerSession.createTextMessage("msg2"));
-
-        con2.close();
-
-        LOGGER.info("Starting connection");
-        con.start();
-
-        TextMessage tm2 = (TextMessage) consumer2.receive(_timeout);
-        assertNotNull(tm2);
-        assertEquals("msg2", tm2.getText());
-
-        tm2.acknowledge();
-        consumerSession.recover();
-
-        TextMessage tm1 = (TextMessage) consumer.receive(_timeout);
-        assertNotNull(tm1);
-        assertEquals("msg1", tm1.getText());
-
-        con.close();
-
-    }
-
-    public void testRecoverInAutoAckListener() throws Exception
-    {
-        Connection con = getConnection();
-
-        final Session consumerSession = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = createTestQueue(consumerSession, "Q3");
-        MessageConsumer consumer = consumerSession.createConsumer(queue);
-        MessageProducer producer = consumerSession.createProducer(queue);
-        producer.send(consumerSession.createTextMessage("hello"));
-
-        final Object lock = new Object();
-
-        consumer.setMessageListener(new MessageListener()
-        {
-
-            @Override
-            public void onMessage(Message message)
-            {
-                try
-                {
-                    _count.incrementAndGet();
-                    if (_count.get() == 1)
-                    {
-                        if (message.getJMSRedelivered())
-                        {
-                            setError(new Exception("Message marked as redelivered on what should be first delivery attempt"));
-                        }
-
-                        consumerSession.recover();
-                    }
-                    else if (_count.get() == 2)
-                    {
-                        if (!message.getJMSRedelivered())
-                        {
-                            setError(new Exception("Message not marked as redelivered on what should be second delivery attempt"));
-                        }
-                    }
-                    else
-                    {
-                        LOGGER.error(message.toString());
-                        setError(new Exception("Message delivered too many times!: " + _count));
-                    }
-                }
-                catch (JMSException e)
-                {
-                    LOGGER.error("Error recovering session: " + e, e);
-                    setError(e);
-                }
-
-                synchronized (lock)
-                {
-                    lock.notify();
-                }
-            }
-        });
-
-        con.start();
-
-        long waitTime = 30000L;
-        long waitUntilTime = System.currentTimeMillis() + waitTime;
-
-        synchronized (lock)
-        {
-            while ((_count.get() <= 1) && (waitTime > 0))
-            {
-                lock.wait(waitTime);
-                if (_count.get() <= 1)
-                {
-                    waitTime = waitUntilTime - System.currentTimeMillis();
-                }
-            }
-        }
-
-        Thread.sleep(1000);
-
-        if (_error != null)
-        {
-            throw _error;
-        }
-
-        assertEquals("Message not received the correct number of times.",
-                     2, _count.get());
-    }
-
-    private void setError(Exception e)
-    {
-        _error = e;
-    }
-    
-    /**
-     * Goal : Check if ordering is preserved when doing recovery under reasonable circumstances.
-     *        Refer QPID-2471 for more details. 
-     * Test strategy :
-     * Send 8 messages to a topic.
-     * The consumer will call recover until it sees a message 5 times,
-     * at which point it will ack that message.
-     * It will continue the above until it acks all the messages.
-     * While doing so it will verify that the messages are not 
-     * delivered out of order.
-     */
-    public void testOrderingWithSyncConsumer() throws Exception
-    {
-        Connection con = getConnection();
-        Session session = con.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Destination topic = createTopic(con, "myTopic");
-        MessageConsumer cons = session.createConsumer(topic);
-        
-        sendMessage(session,topic,8);
-        con.start();
-
-        int messageSeen = 0;
-        int expectedIndex = 0;
-        long startTime = System.currentTimeMillis();
-        
-        while(expectedIndex < 8)
-        {
-            // Based on historical data, on average the test takes about 6 secs to complete.
-            if (System.currentTimeMillis() - startTime > 8000)
-            {
-                fail("Test did not complete on time. Received " + 
-                     expectedIndex + " msgs so far. Please check the logs");
-            }
-            
-            Message message = cons.receive(_timeout);
-            int actualIndex = message.getIntProperty(INDEX);
-            
-            assertEquals("Received Message Out Of Order",expectedIndex, actualIndex);
-                        
-            //don't ack the message until we receive it 5 times
-            if( messageSeen < 5 ) 
-            {
-                LOGGER.debug("Ignoring message " + actualIndex + " and calling recover");
-                session.recover();
-                messageSeen++;
-            }
-            else
-            {
-                messageSeen = 0;
-                expectedIndex++;
-                message.acknowledge();
-                LOGGER.debug("Acknowledging message " + actualIndex);
-            }
-        }        
-    }
-    
     /**
      * Goal : Same as testOderingWithSyncConsumer
      * Test strategy :


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-broker-j git commit: QPID-6933: [System Tests] Move ClientAcknowledgeTest into JMS 1.1 system tests

Posted by or...@apache.org.
QPID-6933: [System Tests] Move ClientAcknowledgeTest into JMS 1.1 system tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/37b6c9db
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/37b6c9db
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/37b6c9db

Branch: refs/heads/master
Commit: 37b6c9dbcdc7feb283cb0aba216939d2afeaf852
Parents: 2c9ce95
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Nov 29 17:19:07 2017 +0000
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Nov 29 17:19:07 2017 +0000

----------------------------------------------------------------------
 .../jms_1_1/acknowledge/AcknowledgeTest.java    | 90 ++++++++++++++++++++
 .../test/unit/ack/ClientAcknowledgeTest.java    | 86 -------------------
 test-profiles/CPPTransientExcludes              |  5 --
 test-profiles/JavaTransientExcludes             |  3 -
 4 files changed, 90 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37b6c9db/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/AcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/AcknowledgeTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/AcknowledgeTest.java
new file mode 100644
index 0000000..65eecc4
--- /dev/null
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/acknowledge/AcknowledgeTest.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.systests.jms_1_1.acknowledge;
+
+import static org.apache.qpid.systests.Utils.INDEX;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assume.assumeThat;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+
+import org.junit.Test;
+
+import org.apache.qpid.systests.JmsTestBase;
+import org.apache.qpid.systests.Utils;
+
+public class AcknowledgeTest extends JmsTestBase
+{
+
+    @Test
+    public void testClientAckWithLargeFlusherPeriod() throws Exception
+    {
+        assumeThat(getBrokerAdmin().supportsRestart(), is(true));
+
+        Queue queue = createQueue(getTestName());
+        Connection connection = getConnection();
+        try
+        {
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(queue);
+            connection.start();
+
+            Utils.sendMessage(session, queue, 2);
+
+            Message message = consumer.receive(getReceiveTimeout());
+            assertNotNull("Message has not been received", message);
+            assertEquals("Unexpected message is received", 0, message.getIntProperty(INDEX));
+            message.acknowledge();
+
+            message = consumer.receive(getReceiveTimeout());
+            assertNotNull("Second message has not been received", message);
+            assertEquals("Unexpected message is received", 1, message.getIntProperty(INDEX));
+        }
+        finally
+        {
+            connection.close();
+        }
+
+        getBrokerAdmin().restart();
+
+        Connection connection2 = getConnection();
+        try
+        {
+            connection2.start();
+            Session session = connection2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+            MessageConsumer consumer = session.createConsumer(queue);
+            Message message = consumer.receive(getReceiveTimeout());
+            assertNotNull("Message has not been received after restart", message);
+            assertEquals("Unexpected message is received after restart", 1, message.getIntProperty(INDEX));
+        }
+        finally
+        {
+            connection2.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37b6c9db/systests/src/test/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java b/systests/src/test/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java
deleted file mode 100644
index f796973..0000000
--- a/systests/src/test/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.test.unit.ack;
-
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-public class ClientAcknowledgeTest extends QpidBrokerTestCase
-{
-    private static final long ONE_DAY_MS = 1000l * 60 * 60 * 24;
-    private Connection _connection;
-    private Session _consumerSession;
-    private MessageConsumer _consumer;
-    private MessageProducer _producer;
-
-    @Override
-    protected void setUp() throws Exception
-    {
-        super.setUp();
-        _connection = getConnection();
-    }
-
-    /**
-     * Test that message.acknowledge actually acknowledges, regardless of
-     * the flusher thread period, by restarting the broker after calling
-     * acknowledge, and then verifying after restart that the message acked
-     * is no longer present. This test requires a persistent store.
-     */
-    public void testClientAckWithLargeFlusherPeriod() throws Exception
-    {
-        if (isBroker010())
-        {
-            setTestClientSystemProperty("qpid.session.max_ack_delay", Long.toString(ONE_DAY_MS));
-        }
-
-        _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        Queue queue = createTestQueue(_consumerSession);
-        _consumer = _consumerSession.createConsumer(queue);
-        _connection.start();
-
-        _producer = _consumerSession.createProducer(queue);
-        _producer.send(createNextMessage(_consumerSession, 1));
-        _producer.send(createNextMessage(_consumerSession, 2));
-
-        Message message = _consumer.receive(getReceiveTimeout());
-        assertNotNull("Message has not been received", message);
-        assertEquals("Unexpected message is received", 1, message.getIntProperty(INDEX));
-        message.acknowledge();
-
-        //restart broker to allow verification of the acks
-        //without explicitly closing connection (which acks)
-        //Seems to be contrary to the JMS spec " Closing a connection does NOT force an acknowledgment of client-acknowledged sessions."
-        restartDefaultBroker();
-
-        // try to receive the message again, which should fail (as it was ackd)
-        _connection = getConnection();
-        _connection.start();
-        _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-        _consumer = _consumerSession.createConsumer(queue);
-        message = _consumer.receive(getReceiveTimeout());
-        assertNotNull("Message has not been received", message);
-        assertEquals("Unexpected message is received", 2, message.getIntProperty(INDEX));
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37b6c9db/test-profiles/CPPTransientExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/CPPTransientExcludes b/test-profiles/CPPTransientExcludes
index e30e9d6..969b927 100644
--- a/test-profiles/CPPTransientExcludes
+++ b/test-profiles/CPPTransientExcludes
@@ -17,8 +17,3 @@
 // under the License.
 //
 
-// those tests require broker recovery
-org.apache.qpid.test.unit.ct.DurableSubscriberTest#*
-
-// test requires a persistent store
-org.apache.qpid.test.unit.ack.ClientAcknowledgeTest#testClientAckWithLargeFlusherPeriod

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/37b6c9db/test-profiles/JavaTransientExcludes
----------------------------------------------------------------------
diff --git a/test-profiles/JavaTransientExcludes b/test-profiles/JavaTransientExcludes
index 60839e7..632f63d 100644
--- a/test-profiles/JavaTransientExcludes
+++ b/test-profiles/JavaTransientExcludes
@@ -23,9 +23,6 @@ org.apache.qpid.server.store.PersistentStoreTest#*
 org.apache.qpid.server.store.SplitStoreTest#*
 org.apache.qpid.server.logging.AlertingTest#testAlertingReallyWorksWithRestart
 org.apache.qpid.server.logging.AlertingTest#testAlertingReallyWorksWithChanges
-org.apache.qpid.test.unit.ack.ClientAcknowledgeTest#testClientAckWithLargeFlusherPeriod
-
-org.apache.qpid.test.unit.ct.DurableSubscriberTest#*
 
 org.apache.qpid.server.queue.QueueMessageDurabilityTest#*
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org