You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/10/06 18:29:28 UTC

svn commit: r1179695 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/test/java/org/apache/qpid/test/unit/message/ systests/src/main/java/org/apache/qpid/test/client/timeouts/ systests/src/main/java/org/apache/qpid/te...

Author: robbie
Date: Thu Oct  6 16:29:27 2011
New Revision: 1179695

URL: http://svn.apache.org/viewvc?rev=1179695&view=rev
Log:
QPID-3526, QPID-3524: make sure the 0-10 client message.acknowledge() actually acknowledges messages immediately, and does so synchronously, adding test to verify behaviour. Split acknowledge() and commit() methods into version specific session implementations for clarity/reuse, align 0-10 and 0-8/9 transacted publishing behaviour, refactor preDeliver and postDeliver methods, remove dead code from consumers.

Applied patch from Oleksandr Rudyy<or...@gmail.com> and myself.

Added:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java
Removed:
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTransactedPublishTest.java
Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
    qpid/trunk/qpid/java/test-profiles/CPPExcludes
    qpid/trunk/qpid/java/test-profiles/CPPTransientExcludes
    qpid/trunk/qpid/java/test-profiles/Java010Excludes
    qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Oct  6 16:29:27 2011
@@ -612,30 +612,27 @@ public abstract class AMQSession<C exten
         {
             throw new IllegalStateException("Session is already closed");
         }
-        else if (hasFailedOver())
+        else if (hasFailedOverDirty())
         {
+            //perform an implicit recover in this scenario
+            recover();
+
+            //notify the consumer
             throw new IllegalStateException("has failed over");
         }
 
-        while (true)
+        try
         {
-            Long tag = _unacknowledgedMessageTags.poll();
-            if (tag == null)
-            {
-                break;
-            }
-
-            try
-            {
-                acknowledgeMessage(tag, false);
-            }
-            catch (TransportException e)
-            {
-                throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
-            }
+            acknowledgeImpl();
+        }
+        catch (TransportException e)
+        {
+            throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
         }
     }
 
+    protected abstract void acknowledgeImpl() throws JMSException;
+
     /**
      * Acknowledge one or many messages.
      *
@@ -844,42 +841,28 @@ public abstract class AMQSession<C exten
      * @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
      *                      not mean that the commit is known to have failed, merely that it is not known whether it
      *                      failed or not.
-     * @todo Be aware of possible changes to parameter order as versions change.
      */
     public void commit() throws JMSException
     {
         checkTransacted();
 
-        try
+        //Check that we are clean to commit.
+        if (_failedOverDirty)
         {
-            //Check that we are clean to commit.
-            if (_failedOverDirty)
-            {
-                rollback();
-
-                throw new TransactionRolledBackException("Connection failover has occured since last send. " +
-                                                         "Forced rollback");
-            }
+            rollback();
 
+            throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
+                                                     "The session transaction was rolled back.");
+        }
 
-            // Acknowledge all delivered messages
-            while (true)
-            {
-                Long tag = _deliveredMessageTags.poll();
-                if (tag == null)
-                {
-                    break;
-                }
-
-                acknowledgeMessage(tag, false);
-            }
-            // Commits outstanding messages and acknowledgments
-            sendCommit();
+        try
+        {
+            commitImpl();
             markClean();
         }
         catch (AMQException e)
         {
-            throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e);
+            throw new JMSAMQException("Exception during commit: " + e.getMessage() + ":" + e.getCause(), e);
         }
         catch (FailoverException e)
         {
@@ -891,8 +874,7 @@ public abstract class AMQSession<C exten
         }
     }
 
-    public abstract void sendCommit() throws AMQException, FailoverException;
-
+    protected abstract void commitImpl() throws AMQException, FailoverException, TransportException;
 
     public void confirmConsumerCancelled(int consumerTag)
     {
@@ -1580,10 +1562,8 @@ public abstract class AMQSession<C exten
 
     abstract public void sync() throws AMQException;
 
-    public int getAcknowledgeMode() throws JMSException
+    public int getAcknowledgeMode()
     {
-        checkNotClosed();
-
         return _acknowledgeMode;
     }
 
@@ -1643,10 +1623,8 @@ public abstract class AMQSession<C exten
         return _ticket;
     }
 
-    public boolean getTransacted() throws JMSException
+    public boolean getTransacted()
     {
-        checkNotClosed();
-
         return _transacted;
     }
 
@@ -3096,21 +3074,11 @@ public abstract class AMQSession<C exten
      *
      * @return boolean true if failover has occured.
      */
-    public boolean hasFailedOver()
+    public boolean hasFailedOverDirty()
     {
         return _failedOverDirty;
     }
 
-    /**
-     * Check to see if any message have been sent in this transaction and have not been commited.
-     *
-     * @return boolean true if a message has been sent but not commited
-     */
-    public boolean isDirty()
-    {
-        return _dirty;
-    }
-
     public void setTicket(int ticket)
     {
         _ticket = ticket;

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Oct  6 16:29:27 2011
@@ -412,25 +412,6 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
-
-    /**
-     * Commit the receipt and the delivery of all messages exchanged by this session resources.
-     */
-    public void sendCommit() throws AMQException, FailoverException
-    {
-        getQpidSession().setAutoSync(true);
-        try
-        {
-            getQpidSession().txCommit();
-        }
-        finally
-        {
-            getQpidSession().setAutoSync(false);
-        }
-        // We need to sync so that we get notify of an error.
-        sync();
-    }
-
     /**
      * Create a queue with a given name.
      *
@@ -463,6 +444,14 @@ public class AMQSession_0_10 extends AMQ
     public void sendRecover() throws AMQException, FailoverException
     {
         // release all unacked messages
+        RangeSet ranges = gatherUnackedRangeSet();
+        getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+        // We need to sync so that we get notify of an error.
+        sync();
+    }
+
+    private RangeSet gatherUnackedRangeSet()
+    {
         RangeSet ranges = new RangeSet();
         while (true)
         {
@@ -471,11 +460,11 @@ public class AMQSession_0_10 extends AMQ
             {
                 break;
             }
-            ranges.add((int) (long) tag);
+
+            ranges.add(tag.intValue());
         }
-        getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
-        // We need to sync so that we get notify of an error.
-        sync();
+
+        return ranges;
     }
 
 
@@ -997,32 +986,26 @@ public class AMQSession_0_10 extends AMQ
         }
     }
 
-    @Override
-    public void commit() throws JMSException
+    public void commitImpl() throws AMQException, FailoverException, TransportException
     {
-        checkTransacted();
-        try
-        {
-            if( _txSize > 0 )
-            {
-                messageAcknowledge(_txRangeSet, true);
-                _txRangeSet.clear();
-                _txSize = 0;
-            }
-            sendCommit();
-        }
-        catch(TransportException e)
+        if( _txSize > 0 )
         {
-            throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e);
+            messageAcknowledge(_txRangeSet, true);
+            _txRangeSet.clear();
+            _txSize = 0;
         }
-        catch (AMQException e)
+
+        getQpidSession().setAutoSync(true);
+        try
         {
-            throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+            getQpidSession().txCommit();
         }
-        catch (FailoverException e)
+        finally
         {
-            throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
+            getQpidSession().setAutoSync(false);
         }
+        // We need to sync so that we get notify of an error.
+        sync();
     }
 
     protected final boolean tagLE(long tag1, long tag2)
@@ -1385,4 +1368,14 @@ public class AMQSession_0_10 extends AMQ
         return sb.toString();
     }
 
+    protected void acknowledgeImpl()
+    {
+        RangeSet range = gatherUnackedRangeSet();
+
+        if(range.size() > 0 )
+        {
+            messageAcknowledge(range, true);
+            getQpidSession().sync();
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Oct  6 16:29:27 2011
@@ -76,6 +76,7 @@ import org.apache.qpid.framing.amqp_0_91
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.transport.TransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -125,6 +126,20 @@ public final class AMQSession_0_8 extend
         return getProtocolHandler().getProtocolVersion();
     }
 
+    protected void acknowledgeImpl()
+    {
+        while (true)
+        {
+            Long tag = _unacknowledgedMessageTags.poll();
+            if (tag == null)
+            {
+                break;
+            }
+
+            acknowledgeMessage(tag, false);
+        }
+    }
+
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
         BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
@@ -170,8 +185,20 @@ public final class AMQSession_0_8 extend
         }
     }
 
-    public void sendCommit() throws AMQException, FailoverException
+    public void commitImpl() throws AMQException, FailoverException, TransportException
     {
+        // Acknowledge all delivered messages
+        while (true)
+        {
+            Long tag = _deliveredMessageTags.poll();
+            if (tag == null)
+            {
+                break;
+            }
+
+            acknowledgeMessage(tag, false);
+        }
+
         final AMQProtocolHandler handler = getProtocolHandler();
 
         handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Oct  6 16:29:27 2011
@@ -37,10 +37,7 @@ import javax.jms.MessageListener;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
-import java.util.SortedSet;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -118,29 +115,10 @@ public abstract class BasicMessageConsum
     protected final int _acknowledgeMode;
 
     /**
-     * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
-     */
-    private int _outstanding;
-
-    /**
-     * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
-     * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
-     */
-    private boolean _dups_ok_acknowledge_send;
-
-    /**
      * List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
      */
     private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
 
-    /** The last tag that was "multiple" acknowledged on this session (if transacted) */
-    private long _lastAcked;
-
-    /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
-    private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
-
-    private final Object _commitLock = new Object();
-
     /**
      * The thread that was used to call receive(). This is important for being able to interrupt that thread if a
      * receive() is in progress.
@@ -290,17 +268,6 @@ public abstract class BasicMessageConsum
         }
     }
 
-    protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
-    {
-        if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
-        {
-            _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
-        }
-        
-        _session.setInRecovery(false);
-        preDeliver(jmsMsg);
-    }
-
     /**
      * @param immediate if true then return immediately if the connection is failing over
      *
@@ -409,7 +376,7 @@ public abstract class BasicMessageConsum
             final AbstractJMSMessage m = returnMessageOrThrow(o);
             if (m != null)
             {
-                preApplicationProcessing(m);
+                preDeliver(m);
                 postDeliver(m);
             }
             return m;
@@ -482,7 +449,7 @@ public abstract class BasicMessageConsum
             final AbstractJMSMessage m = returnMessageOrThrow(o);
             if (m != null)
             {
-                preApplicationProcessing(m);
+                preDeliver(m);
                 postDeliver(m);
             }
 
@@ -734,7 +701,7 @@ public abstract class BasicMessageConsum
         {
             if (isMessageListenerSet())
             {
-                preApplicationProcessing(jmsMessage);
+                preDeliver(jmsMessage);
                 getMessageListener().onMessage(jmsMessage);
                 postDeliver(jmsMessage);
             }
@@ -758,19 +725,28 @@ public abstract class BasicMessageConsum
         }
     }
 
-    void preDeliver(AbstractJMSMessage msg)
+    protected void preDeliver(AbstractJMSMessage msg)
     {
+        _session.setInRecovery(false);
+
         switch (_acknowledgeMode)
         {
-
             case Session.PRE_ACKNOWLEDGE:
                 _session.acknowledgeMessage(msg.getDeliveryTag(), false);
                 break;
-
             case Session.CLIENT_ACKNOWLEDGE:
-                // we set the session so that when the user calls acknowledge() it can call the method on session
-                // to send out the appropriate frame
-                msg.setAMQSession(_session);
+                if (isNoConsume())
+                {
+                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+                }
+                else
+                {
+                    // we set the session so that when the user calls acknowledge() it can call the method on session
+                    // to send out the appropriate frame
+                    msg.setAMQSession(_session);
+                    _session.addUnacknowledgedMessage(msg.getDeliveryTag());
+                    _session.markDirty();
+                }
                 break;
             case Session.SESSION_TRANSACTED:
                 if (isNoConsume())
@@ -792,15 +768,6 @@ public abstract class BasicMessageConsum
     {
         switch (_acknowledgeMode)
         {
-
-            case Session.CLIENT_ACKNOWLEDGE:
-                if (isNoConsume())
-                {
-                    _session.acknowledgeMessage(msg.getDeliveryTag(), false);
-                }
-                _session.markDirty();
-                break;
-
             case Session.DUPS_OK_ACKNOWLEDGE:
             case Session.AUTO_ACKNOWLEDGE:
                 // we do not auto ack a message if the application code called recover()

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Oct  6 16:29:27 2011
@@ -203,9 +203,11 @@ public class BasicMessageConsumer_0_10 e
         super.notifyMessage(messageFrame);
     }
 
-    @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+    @Override
+    protected void preDeliver(AbstractJMSMessage jmsMsg)
     {
-        super.preApplicationProcessing(jmsMsg);
+        super.preDeliver(jmsMsg);
+
         if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE)
         {
             _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Oct  6 16:29:27 2011
@@ -455,16 +455,6 @@ public abstract class BasicMessageProduc
 
         AbstractJMSMessage message = convertToNativeMessage(origMessage);
 
-        if (_transacted)
-        {
-            if (_session.hasFailedOver() && _session.isDirty())
-            {
-                throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.",
-                                          new AMQSessionDirtyException("Failover has occurred and session is dirty " +
-                                                                       "so unable to send."));
-            }
-        }
-
         UUID messageId = null;
         if (_disableMessageId)
         {

Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Thu Oct  6 16:29:27 2011
@@ -64,7 +64,12 @@ public class TestAMQSession extends AMQS
 
     }
 
-    public void sendCommit() throws AMQException, FailoverException
+    public void commitImpl() throws AMQException, FailoverException
+    {
+
+    }
+
+    public void acknowledgeImpl()
     {
 
     }

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java Thu Oct  6 16:29:27 2011
@@ -63,7 +63,7 @@ public class SyncWaitTimeoutDelayTest ex
         catch (JMSException e)
         {
             assertTrue("Wrong exception type received.", e.getLinkedException() instanceof AMQTimeoutException);
-            assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Failed to commit"));
+            assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Exception during commit"));
             // As we are using Nano time ensure to multiply up the millis.            
             assertTrue("Timeout was more than 30s default", (System.nanoTime() - start) < (1000000L * 1000 * 30));
         }

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java?rev=1179695&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java Thu Oct  6 16:29:27 2011
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.test.unit.ack;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class ClientAcknowledgeTest extends QpidBrokerTestCase
+{
+    private static final long ONE_DAY_MS = 1000l * 60 * 60 * 24;
+    private Connection _connection;
+    private Queue _queue;
+    private Session _consumerSession;
+    private MessageConsumer _consumer;
+    private MessageProducer _producer;
+
+    @Override
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        _queue = getTestQueue();
+        _connection = getConnection();
+    }
+
+    /**
+     * Test that message.acknowledge actually acknowledges, regardless of
+     * the flusher thread period, by restarting the broker after calling
+     * acknowledge, and then verifying after restart that the message acked
+     * is no longer present. This test requires a persistent store.
+     */
+    public void testClientAckWithLargeFlusherPeriod() throws Exception
+    {
+        setTestClientSystemProperty("qpid.session.max_ack_delay", Long.toString(ONE_DAY_MS));
+        _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        _consumer = _consumerSession.createConsumer(_queue);
+        _connection.start();
+
+        _producer = _consumerSession.createProducer(_queue);
+        _producer.send(createNextMessage(_consumerSession, 1));
+        _producer.send(createNextMessage(_consumerSession, 2));
+
+        Message message = _consumer.receive(1000l);
+        assertNotNull("Message has not been received", message);
+        assertEquals("Unexpected message is received", 1, message.getIntProperty(INDEX));
+        message.acknowledge();
+
+        //restart broker to allow verification of the acks
+        //without explicitly closing connection (which acks)
+        restartBroker();
+
+        // try to receive the message again, which should fail (as it was ackd)
+        _connection = getConnection();
+        _connection.start();
+        _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        _consumer = _consumerSession.createConsumer(_queue);
+        message = _consumer.receive(1000l);
+        assertNotNull("Message has not been received", message);
+        assertEquals("Unexpected message is received", 2, message.getIntProperty(INDEX));
+    }
+}

Modified: qpid/trunk/qpid/java/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/CPPExcludes?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/CPPExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/CPPExcludes Thu Oct  6 16:29:27 2011
@@ -142,7 +142,6 @@ org.apache.qpid.server.failover.MessageD
 
 // These are recent test additions that are failing with the c++ broker
 // Temporarily disabling until properly investigated.
-org.apache.qpid.test.unit.publish.DirtyTransactedPublishTest#*
 org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#*
 
 org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage#*

Modified: qpid/trunk/qpid/java/test-profiles/CPPTransientExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/CPPTransientExcludes?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/CPPTransientExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/CPPTransientExcludes Thu Oct  6 16:29:27 2011
@@ -27,3 +27,6 @@ org.apache.qpid.test.unit.xa.TopicTest#t
 org.apache.qpid.test.unit.xa.TopicTest#testRecover
 org.apache.qpid.test.unit.xa.QueueTest#testRecover
 org.apache.qpid.test.unit.xa.QueueTest#testSendAndRecover
+
+// test requires a persistent store
+org.apache.qpid.test.unit.ack.ClientAcknowledgeTest#testClientAckWithLargeFlusherPeriod

Modified: qpid/trunk/qpid/java/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Java010Excludes?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Java010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Java010Excludes Thu Oct  6 16:29:27 2011
@@ -55,9 +55,6 @@ org.apache.qpid.server.queue.ProducerFlo
 //QPID-1950 : Commit to test this failure. This is a MINA only failure so it cannot be tested when using 010.
 org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest#*
 
-//QPID-3421: tests are failing on 0.10 test profile
-org.apache.qpid.test.unit.publish.DirtyTransactedPublishTest#*
-
 //QPID-1864: rollback with subscriptions does not work in 0-10 yet
 org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage
 

Modified: qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes Thu Oct  6 16:29:27 2011
@@ -19,6 +19,7 @@
 
 //These tests require a persistent store
 org.apache.qpid.server.store.PersistentStoreTest#*
+org.apache.qpid.test.unit.ack.ClientAcknowledgeTest#testClientAckWithLargeFlusherPeriod
 
 org.apache.qpid.test.unit.ct.DurableSubscriberTest#*
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org