You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/10/06 18:29:28 UTC
svn commit: r1179695 - in /qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
client/src/test/java/org/apache/qpid/test/unit/message/
systests/src/main/java/org/apache/qpid/test/client/timeouts/
systests/src/main/java/org/apache/qpid/te...
Author: robbie
Date: Thu Oct 6 16:29:27 2011
New Revision: 1179695
URL: http://svn.apache.org/viewvc?rev=1179695&view=rev
Log:
QPID-3526, QPID-3524: make sure the 0-10 client message.acknowledge() actually acknowledges messages immediately, and does so synchronously, adding test to verify behaviour. Split acknowledge() and commit() methods into version specific session implementations for clarity/reuse, align 0-10 and 0-8/9 transacted publishing behaviour, refactor preDeliver and postDeliver methods, remove dead code from consumers.
Applied patch from Oleksandr Rudyy<or...@gmail.com> and myself.
Added:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java
Removed:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/publish/DirtyTransactedPublishTest.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
qpid/trunk/qpid/java/test-profiles/CPPExcludes
qpid/trunk/qpid/java/test-profiles/CPPTransientExcludes
qpid/trunk/qpid/java/test-profiles/Java010Excludes
qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Oct 6 16:29:27 2011
@@ -612,30 +612,27 @@ public abstract class AMQSession<C exten
{
throw new IllegalStateException("Session is already closed");
}
- else if (hasFailedOver())
+ else if (hasFailedOverDirty())
{
+ //perform an implicit recover in this scenario
+ recover();
+
+ //notify the consumer
throw new IllegalStateException("has failed over");
}
- while (true)
+ try
{
- Long tag = _unacknowledgedMessageTags.poll();
- if (tag == null)
- {
- break;
- }
-
- try
- {
- acknowledgeMessage(tag, false);
- }
- catch (TransportException e)
- {
- throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
- }
+ acknowledgeImpl();
+ }
+ catch (TransportException e)
+ {
+ throw toJMSException("Exception while acknowledging message(s):" + e.getMessage(), e);
}
}
+ protected abstract void acknowledgeImpl() throws JMSException;
+
/**
* Acknowledge one or many messages.
*
@@ -844,42 +841,28 @@ public abstract class AMQSession<C exten
* @throws JMSException If the JMS provider fails to commit the transaction due to some internal error. This does
* not mean that the commit is known to have failed, merely that it is not known whether it
* failed or not.
- * @todo Be aware of possible changes to parameter order as versions change.
*/
public void commit() throws JMSException
{
checkTransacted();
- try
+ //Check that we are clean to commit.
+ if (_failedOverDirty)
{
- //Check that we are clean to commit.
- if (_failedOverDirty)
- {
- rollback();
-
- throw new TransactionRolledBackException("Connection failover has occured since last send. " +
- "Forced rollback");
- }
+ rollback();
+ throw new TransactionRolledBackException("Connection failover has occured with uncommitted transaction activity." +
+ "The session transaction was rolled back.");
+ }
- // Acknowledge all delivered messages
- while (true)
- {
- Long tag = _deliveredMessageTags.poll();
- if (tag == null)
- {
- break;
- }
-
- acknowledgeMessage(tag, false);
- }
- // Commits outstanding messages and acknowledgments
- sendCommit();
+ try
+ {
+ commitImpl();
markClean();
}
catch (AMQException e)
{
- throw new JMSAMQException("Failed to commit: " + e.getMessage() + ":" + e.getCause(), e);
+ throw new JMSAMQException("Exception during commit: " + e.getMessage() + ":" + e.getCause(), e);
}
catch (FailoverException e)
{
@@ -891,8 +874,7 @@ public abstract class AMQSession<C exten
}
}
- public abstract void sendCommit() throws AMQException, FailoverException;
-
+ protected abstract void commitImpl() throws AMQException, FailoverException, TransportException;
public void confirmConsumerCancelled(int consumerTag)
{
@@ -1580,10 +1562,8 @@ public abstract class AMQSession<C exten
abstract public void sync() throws AMQException;
- public int getAcknowledgeMode() throws JMSException
+ public int getAcknowledgeMode()
{
- checkNotClosed();
-
return _acknowledgeMode;
}
@@ -1643,10 +1623,8 @@ public abstract class AMQSession<C exten
return _ticket;
}
- public boolean getTransacted() throws JMSException
+ public boolean getTransacted()
{
- checkNotClosed();
-
return _transacted;
}
@@ -3096,21 +3074,11 @@ public abstract class AMQSession<C exten
*
* @return boolean true if failover has occured.
*/
- public boolean hasFailedOver()
+ public boolean hasFailedOverDirty()
{
return _failedOverDirty;
}
- /**
- * Check to see if any message have been sent in this transaction and have not been commited.
- *
- * @return boolean true if a message has been sent but not commited
- */
- public boolean isDirty()
- {
- return _dirty;
- }
-
public void setTicket(int ticket)
{
_ticket = ticket;
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Thu Oct 6 16:29:27 2011
@@ -412,25 +412,6 @@ public class AMQSession_0_10 extends AMQ
}
}
-
- /**
- * Commit the receipt and the delivery of all messages exchanged by this session resources.
- */
- public void sendCommit() throws AMQException, FailoverException
- {
- getQpidSession().setAutoSync(true);
- try
- {
- getQpidSession().txCommit();
- }
- finally
- {
- getQpidSession().setAutoSync(false);
- }
- // We need to sync so that we get notify of an error.
- sync();
- }
-
/**
* Create a queue with a given name.
*
@@ -463,6 +444,14 @@ public class AMQSession_0_10 extends AMQ
public void sendRecover() throws AMQException, FailoverException
{
// release all unacked messages
+ RangeSet ranges = gatherUnackedRangeSet();
+ getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
+ // We need to sync so that we get notify of an error.
+ sync();
+ }
+
+ private RangeSet gatherUnackedRangeSet()
+ {
RangeSet ranges = new RangeSet();
while (true)
{
@@ -471,11 +460,11 @@ public class AMQSession_0_10 extends AMQ
{
break;
}
- ranges.add((int) (long) tag);
+
+ ranges.add(tag.intValue());
}
- getQpidSession().messageRelease(ranges, Option.SET_REDELIVERED);
- // We need to sync so that we get notify of an error.
- sync();
+
+ return ranges;
}
@@ -997,32 +986,26 @@ public class AMQSession_0_10 extends AMQ
}
}
- @Override
- public void commit() throws JMSException
+ public void commitImpl() throws AMQException, FailoverException, TransportException
{
- checkTransacted();
- try
- {
- if( _txSize > 0 )
- {
- messageAcknowledge(_txRangeSet, true);
- _txRangeSet.clear();
- _txSize = 0;
- }
- sendCommit();
- }
- catch(TransportException e)
+ if( _txSize > 0 )
{
- throw toJMSException("Session exception occured while trying to commit: " + e.getMessage(), e);
+ messageAcknowledge(_txRangeSet, true);
+ _txRangeSet.clear();
+ _txSize = 0;
}
- catch (AMQException e)
+
+ getQpidSession().setAutoSync(true);
+ try
{
- throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+ getQpidSession().txCommit();
}
- catch (FailoverException e)
+ finally
{
- throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
+ getQpidSession().setAutoSync(false);
}
+ // We need to sync so that we get notify of an error.
+ sync();
}
protected final boolean tagLE(long tag1, long tag2)
@@ -1385,4 +1368,14 @@ public class AMQSession_0_10 extends AMQ
return sb.toString();
}
+ protected void acknowledgeImpl()
+ {
+ RangeSet range = gatherUnackedRangeSet();
+
+ if(range.size() > 0 )
+ {
+ messageAcknowledge(range, true);
+ getQpidSession().sync();
+ }
+ }
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Thu Oct 6 16:29:27 2011
@@ -76,6 +76,7 @@ import org.apache.qpid.framing.amqp_0_91
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,6 +126,20 @@ public final class AMQSession_0_8 extend
return getProtocolHandler().getProtocolVersion();
}
+ protected void acknowledgeImpl()
+ {
+ while (true)
+ {
+ Long tag = _unacknowledgedMessageTags.poll();
+ if (tag == null)
+ {
+ break;
+ }
+
+ acknowledgeMessage(tag, false);
+ }
+ }
+
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
@@ -170,8 +185,20 @@ public final class AMQSession_0_8 extend
}
}
- public void sendCommit() throws AMQException, FailoverException
+ public void commitImpl() throws AMQException, FailoverException, TransportException
{
+ // Acknowledge all delivered messages
+ while (true)
+ {
+ Long tag = _deliveredMessageTags.poll();
+ if (tag == null)
+ {
+ break;
+ }
+
+ acknowledgeMessage(tag, false);
+ }
+
final AMQProtocolHandler handler = getProtocolHandler();
handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Oct 6 16:29:27 2011
@@ -37,10 +37,7 @@ import javax.jms.MessageListener;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
-import java.util.SortedSet;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -118,29 +115,10 @@ public abstract class BasicMessageConsum
protected final int _acknowledgeMode;
/**
- * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
- */
- private int _outstanding;
-
- /**
- * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
- * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
- */
- private boolean _dups_ok_acknowledge_send;
-
- /**
* List of tags delievered, The last of which which should be acknowledged on commit in transaction mode.
*/
private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
- /** The last tag that was "multiple" acknowledged on this session (if transacted) */
- private long _lastAcked;
-
- /** set of tags which have previously been acked; but not part of the multiple ack (transacted mode only) */
- private final SortedSet<Long> _previouslyAcked = new TreeSet<Long>();
-
- private final Object _commitLock = new Object();
-
/**
* The thread that was used to call receive(). This is important for being able to interrupt that thread if a
* receive() is in progress.
@@ -290,17 +268,6 @@ public abstract class BasicMessageConsum
}
}
- protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
- {
- if (_session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
- {
- _session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
- }
-
- _session.setInRecovery(false);
- preDeliver(jmsMsg);
- }
-
/**
* @param immediate if true then return immediately if the connection is failing over
*
@@ -409,7 +376,7 @@ public abstract class BasicMessageConsum
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
- preApplicationProcessing(m);
+ preDeliver(m);
postDeliver(m);
}
return m;
@@ -482,7 +449,7 @@ public abstract class BasicMessageConsum
final AbstractJMSMessage m = returnMessageOrThrow(o);
if (m != null)
{
- preApplicationProcessing(m);
+ preDeliver(m);
postDeliver(m);
}
@@ -734,7 +701,7 @@ public abstract class BasicMessageConsum
{
if (isMessageListenerSet())
{
- preApplicationProcessing(jmsMessage);
+ preDeliver(jmsMessage);
getMessageListener().onMessage(jmsMessage);
postDeliver(jmsMessage);
}
@@ -758,19 +725,28 @@ public abstract class BasicMessageConsum
}
}
- void preDeliver(AbstractJMSMessage msg)
+ protected void preDeliver(AbstractJMSMessage msg)
{
+ _session.setInRecovery(false);
+
switch (_acknowledgeMode)
{
-
case Session.PRE_ACKNOWLEDGE:
_session.acknowledgeMessage(msg.getDeliveryTag(), false);
break;
-
case Session.CLIENT_ACKNOWLEDGE:
- // we set the session so that when the user calls acknowledge() it can call the method on session
- // to send out the appropriate frame
- msg.setAMQSession(_session);
+ if (isNoConsume())
+ {
+ _session.acknowledgeMessage(msg.getDeliveryTag(), false);
+ }
+ else
+ {
+ // we set the session so that when the user calls acknowledge() it can call the method on session
+ // to send out the appropriate frame
+ msg.setAMQSession(_session);
+ _session.addUnacknowledgedMessage(msg.getDeliveryTag());
+ _session.markDirty();
+ }
break;
case Session.SESSION_TRANSACTED:
if (isNoConsume())
@@ -792,15 +768,6 @@ public abstract class BasicMessageConsum
{
switch (_acknowledgeMode)
{
-
- case Session.CLIENT_ACKNOWLEDGE:
- if (isNoConsume())
- {
- _session.acknowledgeMessage(msg.getDeliveryTag(), false);
- }
- _session.markDirty();
- break;
-
case Session.DUPS_OK_ACKNOWLEDGE:
case Session.AUTO_ACKNOWLEDGE:
// we do not auto ack a message if the application code called recover()
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Oct 6 16:29:27 2011
@@ -203,9 +203,11 @@ public class BasicMessageConsumer_0_10 e
super.notifyMessage(messageFrame);
}
- @Override protected void preApplicationProcessing(AbstractJMSMessage jmsMsg) throws JMSException
+ @Override
+ protected void preDeliver(AbstractJMSMessage jmsMsg)
{
- super.preApplicationProcessing(jmsMsg);
+ super.preDeliver(jmsMsg);
+
if (!_session.getTransacted() && _session.getAcknowledgeMode() != org.apache.qpid.jms.Session.CLIENT_ACKNOWLEDGE)
{
_session.addUnacknowledgedMessage(jmsMsg.getDeliveryTag());
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Oct 6 16:29:27 2011
@@ -455,16 +455,6 @@ public abstract class BasicMessageProduc
AbstractJMSMessage message = convertToNativeMessage(origMessage);
- if (_transacted)
- {
- if (_session.hasFailedOver() && _session.isDirty())
- {
- throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.",
- new AMQSessionDirtyException("Failover has occurred and session is dirty " +
- "so unable to send."));
- }
- }
-
UUID messageId = null;
if (_disableMessageId)
{
Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Thu Oct 6 16:29:27 2011
@@ -64,7 +64,12 @@ public class TestAMQSession extends AMQS
}
- public void sendCommit() throws AMQException, FailoverException
+ public void commitImpl() throws AMQException, FailoverException
+ {
+
+ }
+
+ public void acknowledgeImpl()
{
}
Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitTimeoutDelayTest.java Thu Oct 6 16:29:27 2011
@@ -63,7 +63,7 @@ public class SyncWaitTimeoutDelayTest ex
catch (JMSException e)
{
assertTrue("Wrong exception type received.", e.getLinkedException() instanceof AMQTimeoutException);
- assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Failed to commit"));
+ assertTrue("Wrong message received on exception.", e.getMessage().startsWith("Exception during commit"));
// As we are using Nano time ensure to multiply up the millis.
assertTrue("Timeout was more than 30s default", (System.nanoTime() - start) < (1000000L * 1000 * 30));
}
Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java?rev=1179695&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ack/ClientAcknowledgeTest.java Thu Oct 6 16:29:27 2011
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.test.unit.ack;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class ClientAcknowledgeTest extends QpidBrokerTestCase
+{
+ private static final long ONE_DAY_MS = 1000l * 60 * 60 * 24;
+ private Connection _connection;
+ private Queue _queue;
+ private Session _consumerSession;
+ private MessageConsumer _consumer;
+ private MessageProducer _producer;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _queue = getTestQueue();
+ _connection = getConnection();
+ }
+
+ /**
+ * Test that message.acknowledge actually acknowledges, regardless of
+ * the flusher thread period, by restarting the broker after calling
+ * acknowledge, and then verifying after restart that the message acked
+ * is no longer present. This test requires a persistent store.
+ */
+ public void testClientAckWithLargeFlusherPeriod() throws Exception
+ {
+ setTestClientSystemProperty("qpid.session.max_ack_delay", Long.toString(ONE_DAY_MS));
+ _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ _consumer = _consumerSession.createConsumer(_queue);
+ _connection.start();
+
+ _producer = _consumerSession.createProducer(_queue);
+ _producer.send(createNextMessage(_consumerSession, 1));
+ _producer.send(createNextMessage(_consumerSession, 2));
+
+ Message message = _consumer.receive(1000l);
+ assertNotNull("Message has not been received", message);
+ assertEquals("Unexpected message is received", 1, message.getIntProperty(INDEX));
+ message.acknowledge();
+
+ //restart broker to allow verification of the acks
+ //without explicitly closing connection (which acks)
+ restartBroker();
+
+ // try to receive the message again, which should fail (as it was ackd)
+ _connection = getConnection();
+ _connection.start();
+ _consumerSession = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ _consumer = _consumerSession.createConsumer(_queue);
+ message = _consumer.receive(1000l);
+ assertNotNull("Message has not been received", message);
+ assertEquals("Unexpected message is received", 2, message.getIntProperty(INDEX));
+ }
+}
Modified: qpid/trunk/qpid/java/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/CPPExcludes?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/CPPExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/CPPExcludes Thu Oct 6 16:29:27 2011
@@ -142,7 +142,6 @@ org.apache.qpid.server.failover.MessageD
// These are recent test additions that are failing with the c++ broker
// Temporarily disabling until properly investigated.
-org.apache.qpid.test.unit.publish.DirtyTransactedPublishTest#*
org.apache.qpid.test.unit.ack.FailoverBeforeConsumingRecoverTest#*
org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage#*
Modified: qpid/trunk/qpid/java/test-profiles/CPPTransientExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/CPPTransientExcludes?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/CPPTransientExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/CPPTransientExcludes Thu Oct 6 16:29:27 2011
@@ -27,3 +27,6 @@ org.apache.qpid.test.unit.xa.TopicTest#t
org.apache.qpid.test.unit.xa.TopicTest#testRecover
org.apache.qpid.test.unit.xa.QueueTest#testRecover
org.apache.qpid.test.unit.xa.QueueTest#testSendAndRecover
+
+// test requires a persistent store
+org.apache.qpid.test.unit.ack.ClientAcknowledgeTest#testClientAckWithLargeFlusherPeriod
Modified: qpid/trunk/qpid/java/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Java010Excludes?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Java010Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Java010Excludes Thu Oct 6 16:29:27 2011
@@ -55,9 +55,6 @@ org.apache.qpid.server.queue.ProducerFlo
//QPID-1950 : Commit to test this failure. This is a MINA only failure so it cannot be tested when using 010.
org.apache.qpid.server.failover.MessageDisappearWithIOExceptionTest#*
-//QPID-3421: tests are failing on 0.10 test profile
-org.apache.qpid.test.unit.publish.DirtyTransactedPublishTest#*
-
//QPID-1864: rollback with subscriptions does not work in 0-10 yet
org.apache.qpid.test.client.RollbackOrderTest#testOrderingAfterRollbackOnMessage
Modified: qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes?rev=1179695&r1=1179694&r2=1179695&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes Thu Oct 6 16:29:27 2011
@@ -19,6 +19,7 @@
//These tests require a persistent store
org.apache.qpid.server.store.PersistentStoreTest#*
+org.apache.qpid.test.unit.ack.ClientAcknowledgeTest#testClientAckWithLargeFlusherPeriod
org.apache.qpid.test.unit.ct.DurableSubscriberTest#*
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org