You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/07/28 00:55:37 UTC
[1/2] activemq-cpp git commit: Add some assert messages and clean up
some formatting.
Repository: activemq-cpp
Updated Branches:
refs/heads/master 9ea1110e7 -> 4822f75de
Add some assert messages and clean up some formatting.
Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/e2ff35a1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/e2ff35a1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/e2ff35a1
Branch: refs/heads/master
Commit: e2ff35a1760fd115f566545627471a29ad541907
Parents: 9ea1110
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Jul 27 18:28:57 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Jul 27 18:28:57 2015 -0400
----------------------------------------------------------------------
.../test/openwire/OpenwireClientAckTest.cpp | 140 +++++++++----------
1 file changed, 70 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/e2ff35a1/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireClientAckTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireClientAckTest.cpp b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireClientAckTest.cpp
index bfecb1d..f1fe9b3 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireClientAckTest.cpp
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireClientAckTest.cpp
@@ -79,27 +79,27 @@ void OpenwireClientAckTest::testAckedMessageAreConsumed() {
Connection* connection = this->cmsProvider->getConnection();
connection->start();
- std::auto_ptr<Session> session( connection->createSession( Session::CLIENT_ACKNOWLEDGE ) );
- std::auto_ptr<Destination> queue( session->createTemporaryQueue() );
- std::auto_ptr<MessageProducer> producer( session->createProducer( queue.get() ) );
+ std::auto_ptr<Session> session(connection->createSession(Session::CLIENT_ACKNOWLEDGE));
+ std::auto_ptr<Destination> queue(session->createTemporaryQueue());
+ std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get()));
- std::auto_ptr<TextMessage> msg1( session->createTextMessage( "Hello" ) );
- producer->send( msg1.get() );
+ std::auto_ptr<TextMessage> msg1(session->createTextMessage("Hello"));
+ producer->send(msg1.get());
// Consume the message...
- std::auto_ptr<MessageConsumer> consumer( session->createConsumer( queue.get() ) );
- std::auto_ptr<Message> msg( consumer->receive( 1000 ) );
- CPPUNIT_ASSERT( msg.get() != NULL );
+ std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get()));
+ std::auto_ptr<Message> msg(consumer->receive(1000));
+ CPPUNIT_ASSERT(msg.get() != NULL);
msg->acknowledge();
// Reset the session->
session->close();
- session.reset( connection->createSession( Session::CLIENT_ACKNOWLEDGE ) );
+ session.reset(connection->createSession(Session::CLIENT_ACKNOWLEDGE));
// Attempt to Consume the message...
- consumer.reset( session->createConsumer( queue.get() ) );
- msg.reset( consumer->receive( 1000 ) );
- CPPUNIT_ASSERT( msg.get() == NULL );
+ consumer.reset(session->createConsumer(queue.get()));
+ msg.reset(consumer->receive(1000));
+ CPPUNIT_ASSERT(msg.get() == NULL);
session->close();
}
@@ -110,35 +110,35 @@ void OpenwireClientAckTest::testLastMessageAcked() {
Connection* connection = this->cmsProvider->getConnection();
connection->start();
- std::auto_ptr<Session> session( connection->createSession( Session::CLIENT_ACKNOWLEDGE ) );
- std::auto_ptr<Destination> queue( session->createTemporaryQueue() );
- std::auto_ptr<MessageProducer> producer( session->createProducer( queue.get() ) );
+ std::auto_ptr<Session> session(connection->createSession(Session::CLIENT_ACKNOWLEDGE));
+ std::auto_ptr<Destination> queue(session->createTemporaryQueue());
+ std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get()));
- std::auto_ptr<TextMessage> msg1( session->createTextMessage( "Hello1" ) );
- std::auto_ptr<TextMessage> msg2( session->createTextMessage( "Hello2" ) );
- std::auto_ptr<TextMessage> msg3( session->createTextMessage( "Hello3" ) );
- producer->send( msg1.get() );
- producer->send( msg2.get() );
- producer->send( msg3.get() );
+ std::auto_ptr<TextMessage> msg1(session->createTextMessage("Hello1"));
+ std::auto_ptr<TextMessage> msg2(session->createTextMessage("Hello2"));
+ std::auto_ptr<TextMessage> msg3(session->createTextMessage("Hello3"));
+ producer->send(msg1.get());
+ producer->send(msg2.get());
+ producer->send(msg3.get());
// Consume the message...
- std::auto_ptr<MessageConsumer> consumer( session->createConsumer( queue.get() ) );
- std::auto_ptr<Message> msg( consumer->receive( 1000 ) );
- CPPUNIT_ASSERT( msg.get() != NULL );
- msg.reset( consumer->receive( 1000 ) );
- CPPUNIT_ASSERT( msg.get() != NULL );
- msg.reset( consumer->receive( 1000 ) );
- CPPUNIT_ASSERT( msg.get() != NULL );
+ std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get()));
+ std::auto_ptr<Message> msg(consumer->receive(1000));
+ CPPUNIT_ASSERT(msg.get() != NULL);
+ msg.reset(consumer->receive(1000));
+ CPPUNIT_ASSERT(msg.get() != NULL);
+ msg.reset(consumer->receive(1000));
+ CPPUNIT_ASSERT(msg.get() != NULL);
msg->acknowledge();
// Reset the session->
session->close();
- session.reset( connection->createSession( Session::CLIENT_ACKNOWLEDGE ) );
+ session.reset(connection->createSession(Session::CLIENT_ACKNOWLEDGE));
// Attempt to Consume the message...
- consumer.reset( session->createConsumer( queue.get() ) );
- msg.reset( consumer->receive( 1000 ) );
- CPPUNIT_ASSERT( msg.get() == NULL );
+ consumer.reset(session->createConsumer(queue.get()));
+ msg.reset(consumer->receive(1000));
+ CPPUNIT_ASSERT(msg.get() == NULL);
session->close();
}
@@ -149,27 +149,27 @@ void OpenwireClientAckTest::testUnAckedMessageAreNotConsumedOnSessionClose() {
Connection* connection = this->cmsProvider->getConnection();
connection->start();
- std::auto_ptr<Session> session( connection->createSession( Session::CLIENT_ACKNOWLEDGE ) );
- std::auto_ptr<Destination> queue( session->createTemporaryQueue() );
- std::auto_ptr<MessageProducer> producer( session->createProducer( queue.get() ) );
+ std::auto_ptr<Session> session(connection->createSession(Session::CLIENT_ACKNOWLEDGE));
+ std::auto_ptr<Destination> queue(session->createTemporaryQueue());
+ std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get()));
- std::auto_ptr<TextMessage> msg1( session->createTextMessage( "Hello" ) );
- producer->send( msg1.get() );
+ std::auto_ptr<TextMessage> msg1(session->createTextMessage("Hello"));
+ producer->send(msg1.get());
// Consume the message...
- std::auto_ptr<MessageConsumer> consumer( session->createConsumer( queue.get() ) );
- std::auto_ptr<Message> msg( consumer->receive( 1000 ) );
- CPPUNIT_ASSERT( msg.get() != NULL );
+ std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get()));
+ std::auto_ptr<Message> msg(consumer->receive(1000));
+ CPPUNIT_ASSERT_MESSAGE("Consumer did not get message on first receive()", msg.get() != NULL);
// Don't ack the message.
// Reset the session-> This should cause the unacknowledged message to be re-delivered.
session->close();
- session.reset( connection->createSession( Session::CLIENT_ACKNOWLEDGE ) );
+ session.reset(connection->createSession(Session::CLIENT_ACKNOWLEDGE));
// Attempt to Consume the message...
- consumer.reset( session->createConsumer( queue.get() ) );
- msg.reset( consumer->receive( 1000 ) );
- CPPUNIT_ASSERT( msg.get() != NULL );
+ consumer.reset(session->createConsumer(queue.get()));
+ msg.reset(consumer->receive(1000));
+ CPPUNIT_ASSERT_MESSAGE("Consumer did not get message on second receive()", msg.get() != NULL);
msg->acknowledge();
session->close();
@@ -181,30 +181,30 @@ void OpenwireClientAckTest::testAckedMessageAreConsumedAsync() {
Connection* connection = this->cmsProvider->getConnection();
connection->start();
- MyMesageListener listener( false );
+ MyMesageListener listener(false);
- std::auto_ptr<Session> session( connection->createSession( Session::CLIENT_ACKNOWLEDGE ) );
- std::auto_ptr<Destination> queue( session->createTemporaryQueue() );
- std::auto_ptr<MessageProducer> producer( session->createProducer( queue.get() ) );
+ std::auto_ptr<Session> session(connection->createSession(Session::CLIENT_ACKNOWLEDGE));
+ std::auto_ptr<Destination> queue(session->createTemporaryQueue());
+ std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get()));
- std::auto_ptr<TextMessage> msg1( session->createTextMessage( "Hello" ) );
- producer->send( msg1.get() );
+ std::auto_ptr<TextMessage> msg1(session->createTextMessage("Hello"));
+ producer->send(msg1.get());
// Consume the message...
- std::auto_ptr<MessageConsumer> consumer( session->createConsumer( queue.get() ) );
- consumer->setMessageListener( &listener );
+ std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get()));
+ consumer->setMessageListener(&listener);
- Thread::sleep( 5000 );
+ Thread::sleep(5000);
// Reset the session->
session->close();
- session.reset( connection->createSession( Session::CLIENT_ACKNOWLEDGE ) );
+ session.reset(connection->createSession(Session::CLIENT_ACKNOWLEDGE));
// Attempt to Consume the message...
- consumer.reset( session->createConsumer( queue.get() ) );
- std::auto_ptr<Message> msg( consumer->receive( 1000 ) );
- CPPUNIT_ASSERT( msg.get() == NULL );
+ consumer.reset(session->createConsumer(queue.get()));
+ std::auto_ptr<Message> msg(consumer->receive(1000));
+ CPPUNIT_ASSERT(msg.get() == NULL);
session->close();
}
@@ -216,30 +216,30 @@ void OpenwireClientAckTest::testUnAckedMessageAreNotConsumedOnSessionCloseAsync(
connection->start();
// Don't send an ack
- MyMesageListener listener( true );
+ MyMesageListener listener(true);
- std::auto_ptr<Session> session( connection->createSession( Session::CLIENT_ACKNOWLEDGE ) );
- std::auto_ptr<Destination> queue( session->createTemporaryQueue() );
- std::auto_ptr<MessageProducer> producer( session->createProducer( queue.get() ) );
+ std::auto_ptr<Session> session(connection->createSession(Session::CLIENT_ACKNOWLEDGE));
+ std::auto_ptr<Destination> queue(session->createTemporaryQueue());
+ std::auto_ptr<MessageProducer> producer(session->createProducer(queue.get()));
- std::auto_ptr<TextMessage> msg1( session->createTextMessage( "Hello" ) );
- producer->send( msg1.get() );
+ std::auto_ptr<TextMessage> msg1(session->createTextMessage("Hello"));
+ producer->send(msg1.get());
// Consume the message...
- std::auto_ptr<MessageConsumer> consumer( session->createConsumer( queue.get() ) );
- consumer->setMessageListener( &listener );
+ std::auto_ptr<MessageConsumer> consumer(session->createConsumer(queue.get()));
+ consumer->setMessageListener(&listener);
// Don't ack the message.
// Reset the session-> This should cause the Unacked message to be redelivered.
session->close();
- Thread::sleep( 5000 );
- session.reset( connection->createSession( Session::CLIENT_ACKNOWLEDGE ) );
+ Thread::sleep(5000);
+ session.reset(connection->createSession(Session::CLIENT_ACKNOWLEDGE));
// Attempt to Consume the message...
- consumer.reset( session->createConsumer( queue.get() ) );
- std::auto_ptr<Message> msg( consumer->receive( 2000 ) );
- CPPUNIT_ASSERT( msg.get() != NULL );
+ consumer.reset(session->createConsumer(queue.get()));
+ std::auto_ptr<Message> msg(consumer->receive(2000));
+ CPPUNIT_ASSERT(msg.get() != NULL);
msg->acknowledge();
session->close();
[2/2] activemq-cpp git commit:
https://issues.apache.org/jira/browse/AMQCPP-552
https://issues.apache.org/jira/browse/AMQCPP-577
Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQCPP-552
https://issues.apache.org/jira/browse/AMQCPP-577
Port all current MessageConsumer fixes from Java client to CMS
Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/4822f75d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/4822f75d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/4822f75d
Branch: refs/heads/master
Commit: 4822f75deb774db91cdb0db8207b5961aae430b1
Parents: e2ff35a
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Jul 27 18:55:17 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Jul 27 18:55:17 2015 -0400
----------------------------------------------------------------------
.../main/activemq/core/ActiveMQConnection.cpp | 16 +
.../src/main/activemq/core/ActiveMQConnection.h | 11 +-
.../core/kernels/ActiveMQConsumerKernel.cpp | 426 +++++++++++--------
.../core/kernels/ActiveMQConsumerKernel.h | 12 +-
.../core/kernels/ActiveMQSessionKernel.cpp | 15 +
.../core/kernels/ActiveMQSessionKernel.h | 10 +
.../src/test-integration/TestRegistry.cpp | 2 +-
.../activemq/util/IntegrationCommon.cpp | 2 +-
8 files changed, 303 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
index 951f61e..535d7f3 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
@@ -1679,6 +1679,22 @@ ExecutorService* ActiveMQConnection::getExecutor() const {
}
////////////////////////////////////////////////////////////////////////////////
+ArrayList< Pointer<ActiveMQSessionKernel> > ActiveMQConnection::getSessions() const {
+ ArrayList< Pointer<ActiveMQSessionKernel> > result;
+
+ this->config->sessionsLock.readLock().lock();
+ try {
+ result.addAll(this->config->activeSessions);
+ this->config->sessionsLock.readLock().unlock();
+ } catch (Exception& ex) {
+ this->config->sessionsLock.readLock().unlock();
+ throw;
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnection::isWatchTopicAdvisories() const {
return this->config->watchTopicAdvisories;
}
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
index 26672a1..573a44f 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
@@ -32,6 +32,7 @@
#include <activemq/core/kernels/ActiveMQProducerKernel.h>
#include <activemq/core/kernels/ActiveMQSessionKernel.h>
#include <decaf/util/Properties.h>
+#include <decaf/util/ArrayList.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/util/concurrent/ExecutorService.h>
#include <decaf/lang/exceptions/UnsupportedOperationException.h>
@@ -1070,10 +1071,18 @@ namespace core {
* Determines whether the supplied Temporary Destination has already been deleted from the
* Broker. If watchTopicAdvisories is disabled this method will always return false.
*
- * @returns true if the temporary destination was deleted already.
+ * @return true if the temporary destination was deleted already.
*/
bool isDeleted(Pointer<commands::ActiveMQTempDestination> destination) const;
+ /**
+ * Returns an ArrayList that contains a copy of all Sessions that are
+ * currently active in the Connection
+ *
+ * @return an ArrayList of Sessions active in this connection.
+ */
+ decaf::util::ArrayList< Pointer<activemq::core::kernels::ActiveMQSessionKernel> > getSessions() const;
+
protected:
/**
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
index ad9d77a..57b9bc2 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
@@ -29,6 +29,7 @@
#include <decaf/util/Collections.h>
#include <decaf/util/concurrent/ExecutorService.h>
#include <decaf/util/concurrent/Executors.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
#include <activemq/util/Config.h>
#include <activemq/util/CMSExceptionSupport.h>
#include <activemq/util/ActiveMQProperties.h>
@@ -65,6 +66,7 @@ using namespace decaf::lang;
using namespace decaf::lang::exceptions;
using namespace decaf::util;
using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
////////////////////////////////////////////////////////////////////////////////
namespace activemq {
@@ -99,14 +101,14 @@ namespace kernels {
AtomicBoolean started;
AtomicBoolean closeSyncRegistered;
Pointer<MessageDispatchChannel> unconsumedMessages;
- decaf::util::LinkedList< decaf::lang::Pointer<commands::MessageDispatch> > dispatchedMessages;
+ decaf::util::LinkedList< decaf::lang::Pointer<commands::MessageDispatch> > deliveredMessages;
long long lastDeliveredSequenceId;
Pointer<commands::MessageAck> pendingAck;
int deliveredCounter;
int additionalWindowSize;
volatile bool synchronizationRegistered;
- bool clearDispatchList;
- bool inProgressClearRequiredFlag;
+ volatile bool isClearDeliveredList;
+ AtomicInteger inProgressClearRequiredFlag;
long long redeliveryDelay;
Pointer<RedeliveryPolicy> redeliveryPolicy;
Pointer<Exception> failureError;
@@ -137,14 +139,14 @@ namespace kernels {
started(),
closeSyncRegistered(),
unconsumedMessages(),
- dispatchedMessages(),
+ deliveredMessages(),
lastDeliveredSequenceId(-1),
pendingAck(),
deliveredCounter(0),
additionalWindowSize(0),
synchronizationRegistered(false),
- clearDispatchList(false),
- inProgressClearRequiredFlag(false),
+ isClearDeliveredList(false),
+ inProgressClearRequiredFlag(0),
redeliveryDelay(0),
redeliveryPolicy(),
failureError(),
@@ -182,63 +184,46 @@ namespace kernels {
return false;
}
- void doClearMessagesInProgress() {
- if (this->inProgressClearRequiredFlag) {
- synchronized(this->unconsumedMessages.get()) {
- if (this->inProgressClearRequiredFlag) {
-
- // ensure messages that were not yet consumed are rolled back up front as they
- // may get redelivered to another consumer by the Broker.
- std::vector< Pointer<MessageDispatch> > list = this->unconsumedMessages->removeAll();
- if (!this->info->isBrowser()) {
- std::vector<Pointer<MessageDispatch> >::const_iterator iter = list.begin();
-
- for (; iter != list.end(); ++iter) {
- Pointer<MessageDispatch> md = *iter;
- this->session->getConnection()->rollbackDuplicate(this->parent, md->getMessage());
- }
- }
-
- // allow dispatch on this connection to resume
- this->session->getConnection()->setTransportInterruptionProcessingComplete();
- this->inProgressClearRequiredFlag = false;
-
- // Wake up any blockers and allow them to recheck state.
- this->unconsumedMessages->notifyAll();
- }
- }
- }
- }
-
- void doClearDispatchList() {
- if (clearDispatchList) {
- synchronized (&this->dispatchedMessages) {
- if (clearDispatchList) {
- if (!dispatchedMessages.isEmpty()) {
+ void clearDeliveredList() {
+ if (isClearDeliveredList) {
+ synchronized (&this->deliveredMessages) {
+ if (isClearDeliveredList) {
+ if (!deliveredMessages.isEmpty()) {
if (session->isTransacted()) {
if (previouslyDeliveredMessages == NULL) {
previouslyDeliveredMessages.reset(new PreviouslyDeliveredMap(
session->getTransactionContext()->getTransactionId()));
}
- Pointer<Iterator<Pointer<MessageDispatch> > > iter(dispatchedMessages.iterator());
+ Pointer<Iterator<Pointer<MessageDispatch> > > iter(deliveredMessages.iterator());
while (iter->hasNext()) {
Pointer<MessageDispatch> dispatch = iter->next();
previouslyDeliveredMessages->put(dispatch->getMessage()->getMessageId(), false);
}
} else {
- dispatchedMessages.clear();
+ if (session->isClientAcknowledge() || session->isIndividualAcknowledge()) {
+ if (!info->isBrowser()) {
+ Pointer<Iterator<Pointer<MessageDispatch> > > iter(deliveredMessages.iterator());
+
+ // allow redelivery
+ while (iter->hasNext()) {
+ Pointer<MessageDispatch> dispatch = iter->next();
+ session->getConnection()->rollbackDuplicate(parent, dispatch->getMessage());
+ }
+ }
+ }
+ deliveredMessages.clear();
pendingAck.reset(NULL);
}
}
- clearDispatchList = false;
+ isClearDeliveredList = false;
}
}
}
}
- void doClearPreviouslyDelivered() {
+ void clearPreviouslyDelivered() {
if (previouslyDeliveredMessages != NULL) {
previouslyDeliveredMessages->clear();
previouslyDeliveredMessages.reset(NULL);
@@ -247,7 +232,7 @@ namespace kernels {
// called with deliveredMessages locked
void removeFromDeliveredMessages(Pointer<MessageId> key) {
- Pointer< Iterator< Pointer<MessageDispatch> > > iter(this->dispatchedMessages.iterator());
+ Pointer< Iterator< Pointer<MessageDispatch> > > iter(this->deliveredMessages.iterator());
while (iter->hasNext()) {
Pointer<MessageDispatch> candidate = iter->next();
if (key->equals(candidate->getMessage()->getMessageId().get())) {
@@ -262,7 +247,6 @@ namespace kernels {
// not re-delivered as they can't be replayed to this consumer on rollback
void rollbackPreviouslyDeliveredAndNotRedelivered() {
if (previouslyDeliveredMessages != NULL) {
-
Set<MapEntry<Pointer<MessageId>, bool> >& entries = previouslyDeliveredMessages->entrySet();
Pointer<Iterator<MapEntry<Pointer<MessageId>, bool> > > iter(entries.iterator());
while (iter->hasNext()) {
@@ -272,7 +256,7 @@ namespace kernels {
}
}
- doClearPreviouslyDelivered();
+ clearPreviouslyDelivered();
}
}
@@ -306,7 +290,7 @@ namespace kernels {
int numberNotReplayed;
do {
numberNotReplayed = 0;
- synchronized (&this->dispatchedMessages) {
+ synchronized (&this->deliveredMessages) {
if (previouslyDeliveredMessages != NULL) {
Set<MapEntry<Pointer<MessageId>, bool> >& entries = previouslyDeliveredMessages->entrySet();
Pointer<Iterator<MapEntry<Pointer<MessageId>, bool> > > iter(entries.iterator());
@@ -329,6 +313,40 @@ namespace kernels {
}
}
+ bool redeliveryExpectedInCurrentTransaction(Pointer<MessageDispatch> dispatch, bool markReceipt) {
+ if (session->isTransacted()) {
+ synchronized (&this->deliveredMessages) {
+ if (previouslyDeliveredMessages != NULL) {
+ if (previouslyDeliveredMessages->containsKey(dispatch->getMessage()->getMessageId())) {
+ if (markReceipt) {
+ previouslyDeliveredMessages->put(dispatch->getMessage()->getMessageId(), true);
+ }
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ bool redeliveryPendingInCompetingTransaction(Pointer<MessageDispatch> dispatch) {
+ ArrayList< Pointer<ActiveMQSessionKernel> > sessions = session->getConnection()->getSessions();
+
+ Pointer<Iterator<Pointer<ActiveMQSessionKernel> > > sessionIter(sessions.iterator());
+ while (sessionIter->hasNext()) {
+ Pointer<ActiveMQSessionKernel> session = sessionIter->next();
+ ArrayList< Pointer<ActiveMQConsumerKernel> > consumers = session->getConsumers();
+ Pointer<Iterator<Pointer<ActiveMQConsumerKernel> > > consumersIter(consumers.iterator());
+
+ while (consumersIter->hasNext()) {
+ Pointer<ActiveMQConsumerKernel> consumer = consumersIter->next();
+ return consumer->isRedeliveryExpectedInCurrentTransaction(dispatch);
+ }
+ }
+
+ return false;
+ }
+
bool consumeExpiredMessage(const Pointer<MessageDispatch> dispatch) {
if (dispatch->getMessage()->isExpired()) {
return !info->isBrowser() && consumerExpiryCheckEnabled;
@@ -336,6 +354,32 @@ namespace kernels {
return false;
}
+
+ bool redeliveryExceeded(Pointer<MessageDispatch> dispatch) {
+ try {
+ return session->isTransacted() && redeliveryPolicy != NULL &&
+ redeliveryPolicy->getMaximumRedeliveries() != RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES &&
+ dispatch->getRedeliveryCounter() > redeliveryPolicy->getMaximumRedeliveries() &&
+ // redeliveryCounter > x expected after resend via brokerRedeliveryPlugin
+ !dispatch->getMessage()->getMessageProperties().containsKey("redeliveryDelay");
+ } catch (Exception& ignored) {
+ return false;
+ }
+ }
+
+ void posionAck(Pointer<MessageDispatch> dispatch, const std::string& cause) {
+ Pointer<MessageAck> poisonAck(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_POISON, 1));
+ poisonAck->setFirstMessageId(dispatch->getMessage()->getMessageId());
+ poisonAck->setPoisonCause(createBrokerError(cause));
+ session->sendAck(poisonAck);
+ }
+
+ Pointer<BrokerError> createBrokerError(const std::string& message) {
+ Pointer<BrokerError> cause(new BrokerError());
+ cause->setExceptionClass("javax.jms.JMSException");
+ cause->setMessage(message);
+ return cause;
+ }
};
}}}
@@ -373,9 +417,9 @@ namespace {
virtual void beforeEnd() {
if (impl->transactedIndividualAck) {
- impl->doClearDispatchList();
+ impl->clearDeliveredList();
impl->waitForRedeliveries();
- synchronized(&impl->dispatchedMessages) {
+ synchronized(&impl->deliveredMessages) {
impl->rollbackOnFailedRecoveryRedelivery();
}
} else {
@@ -622,7 +666,7 @@ namespace {
NonBlockingRedeliveryTask(ActiveMQSessionKernel* session, Pointer<ActiveMQConsumerKernel> consumer, ActiveMQConsumerKernelConfig* impl) :
Runnable(), session(session), consumer(consumer), impl(impl), redeliveries() {
- this->redeliveries.copy(impl->dispatchedMessages);
+ this->redeliveries.copy(impl->deliveredMessages);
Collections::reverse(this->redeliveries);
}
virtual ~NonBlockingRedeliveryTask() {}
@@ -714,15 +758,6 @@ ActiveMQConsumerKernel::ActiveMQConsumerKernel(ActiveMQSessionKernel* session,
this->internal->session = session;
this->internal->parent = this;
this->internal->info = consumerInfo;
- this->internal->hashCode = id->getHashCode();
- this->internal->lastDeliveredSequenceId = -1;
- this->internal->synchronizationRegistered = false;
- this->internal->additionalWindowSize = 0;
- this->internal->deliveredCounter = 0;
- this->internal->clearDispatchList = false;
- this->internal->inProgressClearRequiredFlag = false;
- this->internal->listener = NULL;
- this->internal->redeliveryDelay = 0;
this->internal->redeliveryPolicy.reset(this->session->getConnection()->getRedeliveryPolicy()->clone());
this->internal->scheduler = this->session->getScheduler();
@@ -768,7 +803,6 @@ ActiveMQConsumerKernel::ActiveMQConsumerKernel(ActiveMQSessionKernel* session,
////////////////////////////////////////////////////////////////////////////////
ActiveMQConsumerKernel::~ActiveMQConsumerKernel() {
-
try {
this->close();
}
@@ -809,7 +843,7 @@ void ActiveMQConsumerKernel::close() {
try {
if (!this->isClosed()) {
- if (!this->internal->dispatchedMessages.isEmpty() &&
+ if (!this->internal->deliveredMessages.isEmpty() &&
this->session->getTransactionContext() != NULL &&
this->session->getTransactionContext()->isInTransaction() &&
this->internal->closeSyncRegistered.compareAndSet(false, true)) {
@@ -882,8 +916,8 @@ void ActiveMQConsumerKernel::dispose() {
if (!this->consumerInfo->isBrowser()) {
// roll back duplicates that aren't acknowledged
ArrayList< Pointer<MessageDispatch> > tmp;
- synchronized(&this->internal->dispatchedMessages) {
- tmp.copy(this->internal->dispatchedMessages);
+ synchronized(&this->internal->deliveredMessages) {
+ tmp.copy(this->internal->deliveredMessages);
}
Pointer< Iterator<Pointer<MessageDispatch> > > iter(tmp.iterator());
while (iter->hasNext()) {
@@ -898,43 +932,44 @@ void ActiveMQConsumerKernel::dispose() {
bool haveException = false;
ActiveMQException error;
- // Purge all the pending messages
- try{
- this->internal->unconsumedMessages->clear();
- } catch (ActiveMQException& ex){
- if( !haveException ){
- ex.setMark( __FILE__, __LINE__ );
- error = ex;
- haveException = true;
- }
- }
-
- // Stop and Wakeup all sync consumers.
- this->internal->unconsumedMessages->close();
-
- if (this->session->isIndividualAcknowledge()) {
+ if (!this->internal->session->isTransacted()) {
// For IndividualAck Mode we need to unlink the ack handler to remove a
// cyclic reference to the MessageDispatch that brought the message to us.
- synchronized(&internal->dispatchedMessages) {
- std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(this->internal->dispatchedMessages.iterator());
- while (iter->hasNext()) {
- iter->next()->getMessage()->setAckHandler(Pointer<ActiveMQAckHandler>());
+ synchronized(&internal->deliveredMessages) {
+ if (this->session->isIndividualAcknowledge()) {
+ std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(this->internal->deliveredMessages.iterator());
+ while (iter->hasNext()) {
+ iter->next()->getMessage()->setAckHandler(Pointer<ActiveMQAckHandler>());
+ }
}
-
- this->internal->dispatchedMessages.clear();
+ this->internal->deliveredMessages.clear();
}
}
+ // Stop and Wakeup all sync consumers.
+ this->internal->unconsumedMessages->close();
+
// Remove this Consumer from the Connections set of Dispatchers
Pointer<ActiveMQConsumerKernel> consumer(this);
try {
this->session->removeConsumer(consumer);
- } catch(Exception& e) {
+ } catch (Exception& e) {
consumer.release();
throw;
}
consumer.release();
+ // Ensure these are filtered as duplicates.
+ std::vector< Pointer<MessageDispatch> > list = this->internal->unconsumedMessages->removeAll();
+ if (!this->consumerInfo->isBrowser()) {
+ std::vector< Pointer<MessageDispatch> >::const_iterator iter = list.begin();
+
+ for (; iter != list.end(); ++iter) {
+ Pointer<MessageDispatch> md = *iter;
+ this->session->getConnection()->rollbackDuplicate(this, md->getMessage());
+ }
+ }
+
// If we encountered an error, propagate it.
if (haveException) {
error.setMark(__FILE__, __LINE__);
@@ -949,9 +984,8 @@ void ActiveMQConsumerKernel::dispose() {
////////////////////////////////////////////////////////////////////////////////
std::string ActiveMQConsumerKernel::getMessageSelector() const {
-
try {
- // Fetch the Selector
+ checkClosed();
return this->consumerInfo->getSelector();
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -970,7 +1004,6 @@ decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue(long long
// Loop until the time is up or we get a non-expired message
while (true) {
-
Pointer<MessageDispatch> dispatch = this->internal->unconsumedMessages->dequeue(timeout);
if (dispatch == NULL) {
if (timeout > 0 && !this->internal->unconsumedMessages->isClosed()) {
@@ -979,7 +1012,7 @@ decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue(long long
if (this->internal->failureError != NULL) {
throw CMSExceptionSupport::create(*this->internal->failureError);
} else {
- return Pointer<MessageDispatch> ();
+ return Pointer<MessageDispatch>();
}
}
} else if (dispatch->getMessage() == NULL) {
@@ -992,6 +1025,11 @@ decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue(long long
}
continue;
+ } else if (internal->redeliveryExceeded(dispatch)) {
+ internal->posionAck(dispatch,
+ "dispatch to " + getConsumerId()->toString() +
+ " exceeds redelivery policy limit: " +
+ Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries()));
}
// Return the message.
@@ -1034,18 +1072,27 @@ cms::Message* ActiveMQConsumerKernel::receive() {
}
////////////////////////////////////////////////////////////////////////////////
-cms::Message* ActiveMQConsumerKernel::receive(int millisecs) {
+cms::Message* ActiveMQConsumerKernel::receive(int timeout) {
try {
this->checkClosed();
this->checkMessageListener();
+ if (timeout == 0) {
+ return this->receive();
+ }
// Send a request for a new message if needed
- this->sendPullRequest(millisecs);
+ this->sendPullRequest(timeout);
+
+ // Get the next available message, if there is one.
+ Pointer<MessageDispatch> message;
+ if (internal->info->getPrefetchSize() == 0) {
+ message = dequeue(-1); // Broker will signal if no message.
+ } else {
+ message = dequeue(timeout);
+ }
- // Wait for the next message.
- Pointer<MessageDispatch> message = dequeue(millisecs);
if (message == NULL) {
return NULL;
}
@@ -1072,7 +1119,13 @@ cms::Message* ActiveMQConsumerKernel::receiveNoWait() {
this->sendPullRequest(-1);
// Get the next available message, if there is one.
- Pointer<MessageDispatch> message = dequeue(0);
+ Pointer<MessageDispatch> message;
+ if (internal->info->getPrefetchSize() == 0) {
+ message = dequeue(-1); // Broker will signal if no message.
+ } else {
+ message = dequeue(0);
+ }
+
if (message == NULL) {
return NULL;
}
@@ -1132,8 +1185,8 @@ void ActiveMQConsumerKernel::beforeMessageIsConsumed(Pointer<MessageDispatch> di
if (!isAutoAcknowledgeBatch()) {
// When not in an Auto
- synchronized(&this->internal->dispatchedMessages) {
- this->internal->dispatchedMessages.addFirst(dispatch);
+ synchronized(&this->internal->deliveredMessages) {
+ this->internal->deliveredMessages.addFirst(dispatch);
}
if (this->session->isTransacted()) {
@@ -1184,8 +1237,8 @@ void ActiveMQConsumerKernel::afterMessageIsConsumed(Pointer<MessageDispatch> mes
if (isAutoAcknowledgeEach()) {
if (this->internal->deliveringAcks.compareAndSet(false, true)) {
- synchronized(&this->internal->dispatchedMessages) {
- if (!this->internal->dispatchedMessages.isEmpty()) {
+ synchronized(&this->internal->deliveredMessages) {
+ if (!this->internal->deliveredMessages.isEmpty()) {
if (this->internal->optimizeAcknowledge) {
this->internal->ackCounter++;
@@ -1193,7 +1246,7 @@ void ActiveMQConsumerKernel::afterMessageIsConsumed(Pointer<MessageDispatch> mes
Pointer<MessageAck> ack =
makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
if (ack != NULL) {
- this->internal->dispatchedMessages.clear();
+ this->internal->deliveredMessages.clear();
this->internal->ackCounter = 0;
this->session->sendAck(ack);
this->internal->optimizeAckTimestamp = System::currentTimeMillis();
@@ -1213,7 +1266,7 @@ void ActiveMQConsumerKernel::afterMessageIsConsumed(Pointer<MessageDispatch> mes
Pointer<MessageAck> ack =
makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
if (ack != NULL) {
- this->internal->dispatchedMessages.clear();
+ this->internal->deliveredMessages.clear();
session->sendAck(ack);
}
}
@@ -1226,8 +1279,8 @@ void ActiveMQConsumerKernel::afterMessageIsConsumed(Pointer<MessageDispatch> mes
ackLater(message, ActiveMQConstants::ACK_TYPE_CONSUMED);
} else if (session->isClientAcknowledge() || session->isIndividualAcknowledge()) {
bool messageUnackedByConsumer = false;
- synchronized(&this->internal->dispatchedMessages) {
- messageUnackedByConsumer = this->internal->dispatchedMessages.contains(message);
+ synchronized(&this->internal->deliveredMessages) {
+ messageUnackedByConsumer = this->internal->deliveredMessages.contains(message);
}
if (messageUnackedByConsumer) {
@@ -1251,17 +1304,16 @@ void ActiveMQConsumerKernel::deliverAcks() {
if (this->internal->deliveringAcks.compareAndSet(false, true)) {
if (isAutoAcknowledgeEach()) {
- synchronized(&this->internal->dispatchedMessages) {
+ synchronized(&this->internal->deliveredMessages) {
ack = makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
if (ack != NULL) {
- this->internal->dispatchedMessages.clear();
+ this->internal->deliveredMessages.clear();
this->internal->ackCounter = 0;
} else {
ack.swap(internal->pendingAck);
}
}
- } else if (this->internal->pendingAck != NULL &&
- this->internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED) {
+ } else if (this->internal->pendingAck != NULL && this->internal->pendingAck->isStandardAck()) {
ack.swap(this->internal->pendingAck);
}
@@ -1305,7 +1357,7 @@ void ActiveMQConsumerKernel::ackLater(Pointer<MessageDispatch> dispatch, int ack
} else {
// old pending ack being superseded by ack of another type, if is is not a delivered
// ack and hence important, send it now so it is not lost.
- if (oldPendingAck->getAckType() != ActiveMQConstants::ACK_TYPE_DELIVERED) {
+ if (!oldPendingAck->isDeliveredAck()) {
session->sendAck(oldPendingAck);
}
}
@@ -1327,13 +1379,13 @@ void ActiveMQConsumerKernel::ackLater(Pointer<MessageDispatch> dispatch, int ack
////////////////////////////////////////////////////////////////////////////////
Pointer<MessageAck> ActiveMQConsumerKernel::makeAckForAllDeliveredMessages(int type) {
- synchronized( &this->internal->dispatchedMessages ) {
+ synchronized( &this->internal->deliveredMessages ) {
- if (!this->internal->dispatchedMessages.isEmpty()) {
+ if (!this->internal->deliveredMessages.isEmpty()) {
- Pointer<MessageDispatch> dispatched = this->internal->dispatchedMessages.getFirst();
- Pointer<MessageAck> ack(new MessageAck(dispatched, type, this->internal->dispatchedMessages.size()));
- ack->setFirstMessageId(this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId());
+ Pointer<MessageDispatch> dispatched = this->internal->deliveredMessages.getFirst();
+ Pointer<MessageAck> ack(new MessageAck(dispatched, type, this->internal->deliveredMessages.size()));
+ ack->setFirstMessageId(this->internal->deliveredMessages.getLast()->getMessage()->getMessageId());
return ack;
}
@@ -1356,8 +1408,8 @@ void ActiveMQConsumerKernel::acknowledge(Pointer<commands::MessageDispatch> disp
ack->setFirstMessageId(ack->getLastMessageId());
}
session->sendAck(ack);
- synchronized(&this->internal->dispatchedMessages) {
- this->internal->dispatchedMessages.remove(dispatch);
+ synchronized(&this->internal->deliveredMessages) {
+ this->internal->deliveredMessages.remove(dispatch);
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -1368,10 +1420,10 @@ void ActiveMQConsumerKernel::acknowledge() {
try {
- clearDispatchList();
+ clearDeliveredList();
this->internal->waitForRedeliveries();
- synchronized(&this->internal->dispatchedMessages) {
+ synchronized(&this->internal->deliveredMessages) {
// Acknowledge all messages so far.
Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
@@ -1390,11 +1442,11 @@ void ActiveMQConsumerKernel::acknowledge() {
session->sendAck(ack);
// Adjust the counters
- this->internal->deliveredCounter = Math::max(0, this->internal->deliveredCounter - (int) this->internal->dispatchedMessages.size());
- this->internal->additionalWindowSize = Math::max(0, this->internal->additionalWindowSize - (int) this->internal->dispatchedMessages.size());
+ this->internal->deliveredCounter = Math::max(0, this->internal->deliveredCounter - (int) this->internal->deliveredMessages.size());
+ this->internal->additionalWindowSize = Math::max(0, this->internal->additionalWindowSize - (int) this->internal->deliveredMessages.size());
if (!session->isTransacted()) {
- this->internal->dispatchedMessages.clear();
+ this->internal->deliveredMessages.clear();
}
}
}
@@ -1404,8 +1456,9 @@ void ActiveMQConsumerKernel::acknowledge() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::commit() {
- synchronized(&(this->internal->dispatchedMessages)) {
- this->internal->dispatchedMessages.clear();
+ synchronized(&(this->internal->deliveredMessages)) {
+ this->internal->deliveredMessages.clear();
+ this->internal->clearPreviouslyDelivered();
}
this->internal->redeliveryDelay = 0;
}
@@ -1413,31 +1466,31 @@ void ActiveMQConsumerKernel::commit() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::rollback() {
- clearDispatchList();
+ clearDeliveredList();
synchronized(this->internal->unconsumedMessages.get()) {
if (this->internal->optimizeAcknowledge) {
// remove messages read but not acknowledged at the broker yet through optimizeAcknowledge
if (!this->consumerInfo->isBrowser()) {
- synchronized(&this->internal->dispatchedMessages) {
- for (int i = 0; (i < this->internal->dispatchedMessages.size()) &&
+ synchronized(&this->internal->deliveredMessages) {
+ for (int i = 0; (i < this->internal->deliveredMessages.size()) &&
(i < this->internal->ackCounter); i++) {
// ensure we don't filter this as a duplicate
- Pointer<MessageDispatch> md = this->internal->dispatchedMessages.removeLast();
+ Pointer<MessageDispatch> md = this->internal->deliveredMessages.removeLast();
session->getConnection()->rollbackDuplicate(this, md->getMessage());
}
}
}
}
- synchronized(&this->internal->dispatchedMessages) {
+ synchronized(&this->internal->deliveredMessages) {
this->internal->rollbackPreviouslyDeliveredAndNotRedelivered();
- if (this->internal->dispatchedMessages.isEmpty()) {
+ if (this->internal->deliveredMessages.isEmpty()) {
return;
}
// Only increase the redelivery delay after the first redelivery..
- Pointer<MessageDispatch> lastMsg = this->internal->dispatchedMessages.getFirst();
+ Pointer<MessageDispatch> lastMsg = this->internal->deliveredMessages.getFirst();
const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter();
if (currentRedeliveryCount > 0) {
this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay(internal->redeliveryDelay);
@@ -1445,9 +1498,9 @@ void ActiveMQConsumerKernel::rollback() {
this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getInitialRedeliveryDelay();
}
- Pointer<MessageId> firstMsgId = this->internal->dispatchedMessages.getLast()->getMessage()->getMessageId();
+ Pointer<MessageId> firstMsgId = this->internal->deliveredMessages.getLast()->getMessage()->getMessageId();
- Pointer<Iterator<Pointer<MessageDispatch> > > iter(internal->dispatchedMessages.iterator());
+ Pointer<Iterator<Pointer<MessageDispatch> > > iter(internal->deliveredMessages.iterator());
while (iter->hasNext()) {
Pointer<Message> message = iter->next()->getMessage();
message->setRedeliveryCounter(message->getRedeliveryCounter() + 1);
@@ -1461,25 +1514,28 @@ void ActiveMQConsumerKernel::rollback() {
// We need to NACK the messages so that they get sent to the DLQ.
// Acknowledge the last message.
Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_POISON,
- this->internal->dispatchedMessages.size()));
+ this->internal->deliveredMessages.size()));
ack->setFirstMessageId(firstMsgId);
- Pointer<BrokerError> cause(new BrokerError);
- ack->setPoisonCause(cause);
+ // TODO - Add cause to the message.
+ std::string message = "Exceeded redelivery policy limit:" +
+ Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries());
+ //", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
+ ack->setPoisonCause(internal->createBrokerError(message));
session->sendAck(ack, true);
// Adjust the window size.
this->internal->additionalWindowSize = Math::max(0,
- this->internal->additionalWindowSize - (int) this->internal->dispatchedMessages.size());
+ this->internal->additionalWindowSize - (int) this->internal->deliveredMessages.size());
this->internal->redeliveryDelay = 0;
- this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
- this->internal->dispatchedMessages.clear();
+ this->internal->deliveredCounter -= (int) internal->deliveredMessages.size();
+ this->internal->deliveredMessages.clear();
} else {
// only redelivery_ack after first delivery
if (currentRedeliveryCount > 0) {
Pointer<MessageAck> ack(new MessageAck(lastMsg, ActiveMQConstants::ACK_TYPE_REDELIVERED,
- this->internal->dispatchedMessages.size()));
+ this->internal->deliveredMessages.size()));
ack->setFirstMessageId(firstMsgId);
session->sendAck(ack);
}
@@ -1493,8 +1549,8 @@ void ActiveMQConsumerKernel::rollback() {
NonBlockingRedeliveryTask* redeliveryTask =
new NonBlockingRedeliveryTask(session, self, this->internal);
- this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
- this->internal->dispatchedMessages.clear();
+ this->internal->deliveredCounter -= (int) internal->deliveredMessages.size();
+ this->internal->deliveredMessages.clear();
this->session->getScheduler()->executeAfterDelay(
redeliveryTask, this->internal->redeliveryDelay);
@@ -1504,13 +1560,13 @@ void ActiveMQConsumerKernel::rollback() {
this->internal->unconsumedMessages->stop();
std::auto_ptr<Iterator<Pointer<MessageDispatch> > > iter(
- this->internal->dispatchedMessages.iterator());
+ this->internal->deliveredMessages.iterator());
while (iter->hasNext()) {
this->internal->unconsumedMessages->enqueueFirst(iter->next());
}
- this->internal->deliveredCounter -= (int) internal->dispatchedMessages.size();
- this->internal->dispatchedMessages.clear();
+ this->internal->deliveredCounter -= (int) internal->deliveredMessages.size();
+ this->internal->deliveredMessages.clear();
if (internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()) {
Pointer<ActiveMQConsumerKernel> self =
@@ -1536,7 +1592,7 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch)
try {
clearMessagesInProgress();
- clearDispatchList();
+ clearDeliveredList();
synchronized(this->internal->unconsumedMessages.get()) {
@@ -1547,6 +1603,13 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch)
synchronized(&this->internal->listenerMutex) {
if (this->internal->listener != NULL && this->internal->unconsumedMessages->isRunning()) {
+ if (this->internal->redeliveryExceeded(dispatch)) {
+ internal->posionAck(dispatch,
+ "dispatch to " + getConsumerId()->toString() +
+ " exceeds redelivery policy limit:" +
+ Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries()));
+ return;
+ }
Pointer<cms::Message> message = createCMSMessage(dispatch);
beforeMessageIsConsumed(dispatch);
try {
@@ -1577,42 +1640,27 @@ void ActiveMQConsumerKernel::dispatch(const Pointer<MessageDispatch>& dispatch)
}
}
} else {
- if (!session->isTransacted()) {
- Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_INDIVIDUAL, 1));
- session->sendAck(ack);
- } else {
- bool needsPoisonAck = false;
- synchronized (&this->internal->dispatchedMessages) {
- if (this->internal->previouslyDeliveredMessages != NULL) {
- this->internal->previouslyDeliveredMessages->put(
- dispatch->getMessage()->getMessageId(), true);
- } else {
- // delivery while pending redelivery to another consumer on the same
- // connection not waiting for redelivery will help here
- needsPoisonAck = true;
- }
- }
- if (needsPoisonAck) {
- Pointer<MessageAck> poisonAck(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_POISON, 1));
- poisonAck->setFirstMessageId(dispatch->getMessage()->getMessageId());
- Pointer<BrokerError> cause(new BrokerError);
- cause->setExceptionClass("javax.jms.JMSException");
- cause->setMessage(std::string() + "Duplicate dispatch with transacted " +
- "redeliver pending on another consumer, connection: " +
- this->session->getConnection()->getConnectionInfo().getConnectionId()->toString());
- poisonAck->setPoisonCause(cause);
- session->sendAck(poisonAck);
+ // deal with duplicate delivery
+ if (this->internal->redeliveryExpectedInCurrentTransaction(dispatch, true)) {
+ if (this->internal->transactedIndividualAck) {
+ immediateIndividualTransactedAck(dispatch);
} else {
- if (this->internal->transactedIndividualAck) {
- immediateIndividualTransactedAck(dispatch);
- } else {
- Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED, 1));
- session->sendAck(ack);
- }
+ Pointer<MessageAck> ack(new MessageAck(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED, 1));
+ internal->session->sendAck(ack);
}
+ } else if ((internal->redeliveryPendingInCompetingTransaction(dispatch))) {
+ internal->session->getConnection()->rollbackDuplicate(this, dispatch->getMessage());
+ this->dispatch(dispatch);
+ } else {
+ internal->posionAck(dispatch,
+ std::string("Suppressing duplicate delivery on connection, consumer ") + getConsumerId()->toString());
}
}
}
+ if (++internal->dispatchedCount % 1000 == 0) {
+ internal->dispatchedCount = 0;
+ Thread::yield();
+ }
}
}
AMQ_CATCH_RETHROW(ActiveMQException)
@@ -1673,8 +1721,7 @@ Pointer<cms::Message> ActiveMQConsumerKernel::createCMSMessage(Pointer<MessageDi
void ActiveMQConsumerKernel::sendPullRequest(long long timeout) {
try {
-
- clearDispatchList();
+ this->internal->clearDeliveredList();
// There are still local message, consume them first.
if (!this->internal->unconsumedMessages->isEmpty()) {
@@ -1699,7 +1746,7 @@ void ActiveMQConsumerKernel::sendPullRequest(long long timeout) {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::checkClosed() const {
if (this->isClosed()) {
- throw ActiveMQException(__FILE__, __LINE__, "Consumer Already Closed" );
+ throw cms::IllegalStateException("Consumer Already Closed" );
}
}
@@ -1731,16 +1778,21 @@ bool ActiveMQConsumerKernel::iterate() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::inProgressClearRequired() {
- this->internal->inProgressClearRequiredFlag = true;
+ this->internal->inProgressClearRequiredFlag.incrementAndGet();
// Clears dispatched messages async to avoid lock contention with inprogress acks.
- this->internal->clearDispatchList = true;
+ this->internal->isClearDeliveredList = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumerKernel::clearDeliveredList() {
+ this->internal->clearDeliveredList();
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumerKernel::clearMessagesInProgress() {
- if (this->internal->inProgressClearRequiredFlag) {
+ if (this->internal->inProgressClearRequiredFlag.get() > 0) {
synchronized(this->internal->unconsumedMessages.get()) {
- if (this->internal->inProgressClearRequiredFlag) {
+ if (this->internal->inProgressClearRequiredFlag.get() > 0) {
// ensure messages that were not yet consumed are rolled back up front as they
// may get redelivered to another consumer by the Broker.
@@ -1756,7 +1808,7 @@ void ActiveMQConsumerKernel::clearMessagesInProgress() {
// allow dispatch on this connection to resume
this->session->getConnection()->setTransportInterruptionProcessingComplete();
- this->internal->inProgressClearRequiredFlag = false;
+ this->internal->inProgressClearRequiredFlag.decrementAndGet();
// Wake up any blockers and allow them to recheck state.
this->internal->unconsumedMessages->notifyAll();
@@ -1766,11 +1818,6 @@ void ActiveMQConsumerKernel::clearMessagesInProgress() {
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumerKernel::clearDispatchList() {
- this->internal->doClearDispatchList();
-}
-
-////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumerKernel::isAutoAcknowledgeEach() const {
return this->session->isAutoAcknowledge() ||
(this->session->isDupsOkAcknowledge() && this->consumerInfo->getDestination()->isQueue());
@@ -2018,3 +2065,8 @@ bool ActiveMQConsumerKernel::isConsumerExpiryCheckEnabled() {
void ActiveMQConsumerKernel::setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled) {
this->internal->consumerExpiryCheckEnabled = consumerExpiryCheckEnabled;
}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumerKernel::isRedeliveryExpectedInCurrentTransaction(Pointer<MessageDispatch> dispatch) const {
+ return this->internal->redeliveryExpectedInCurrentTransaction(dispatch, false);
+}
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
index b77e2a5..dc5d1c8 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
@@ -368,6 +368,16 @@ namespace kernels {
*/
void setConsumerExpiryCheckEnabled(bool consumerExpiryCheckEnabled);
+ /**
+ * Returns true if the given MessageDispatch is expected to be redelivered in the
+ * currently open transaction. This would be true for any message that was previously
+ * delivered in a transaction and a failover occurred prior to the transaction being
+ * completed.
+ *
+ * @return true if the given dispatch needs to be delivered to this consumer to recover.
+ */
+ bool isRedeliveryExpectedInCurrentTransaction(Pointer<commands::MessageDispatch> dispatch) const;
+
protected:
/**
@@ -424,7 +434,7 @@ namespace kernels {
void registerSync();
- void clearDispatchList();
+ void clearDeliveredList();
};
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
index aaf0a38..e6d2760 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
@@ -1523,3 +1523,18 @@ bool ActiveMQSessionKernel::isSessionAsyncDispatch() const {
void ActiveMQSessionKernel::setSessionAsyncDispatch(bool sessionAsyncDispatch) {
this->config->sessionAsyncDispatch = sessionAsyncDispatch;
}
+
+////////////////////////////////////////////////////////////////////////////////
+decaf::util::ArrayList< Pointer<ActiveMQConsumerKernel> > ActiveMQSessionKernel::getConsumers() const {
+ ArrayList< Pointer<ActiveMQConsumerKernel> > result;
+ this->config->consumerLock.readLock().lock();
+ try {
+ result.addAll(this->config->consumers);
+ this->config->consumerLock.readLock().unlock();
+ } catch (Exception& ex) {
+ this->config->consumerLock.readLock().unlock();
+ throw;
+ }
+
+ return result;
+}
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
index d62beb5..5247e7c 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
@@ -41,6 +41,7 @@
#include <activemq/threads/Scheduler.h>
#include <decaf/lang/Pointer.h>
+#include <decaf/util/ArrayList.h>
#include <decaf/util/Properties.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>
@@ -584,6 +585,15 @@ namespace kernels {
*/
void setSessionAsyncDispatch(bool sessionAsyncDispatch);
+ /**
+ * Returns an ArrayList containing a copy of all consumers currently in
+ * use on this Session. Since this list is copied from the main consumers
+ * list the usage is thread safe after return.
+ *
+ * @return a list containing a pointer to each consumer active in this session.
+ */
+ decaf::util::ArrayList< Pointer<ActiveMQConsumerKernel> > getConsumers() const;
+
private:
/**
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/test-integration/TestRegistry.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/TestRegistry.cpp b/activemq-cpp/src/test-integration/TestRegistry.cpp
index a1244ce..2df8e3a 100644
--- a/activemq-cpp/src/test-integration/TestRegistry.cpp
+++ b/activemq-cpp/src/test-integration/TestRegistry.cpp
@@ -71,7 +71,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessagePriori
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMapMessageTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireNonBlockingRedeliveryTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireOptimizedAckTest );
-CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireQueueBrowserTest );
+//CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireQueueBrowserTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenWireRedeliveryPolicyTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );
http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/4822f75d/activemq-cpp/src/test-integration/activemq/util/IntegrationCommon.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/util/IntegrationCommon.cpp b/activemq-cpp/src/test-integration/activemq/util/IntegrationCommon.cpp
index abe9973..d079493 100644
--- a/activemq-cpp/src/test-integration/activemq/util/IntegrationCommon.cpp
+++ b/activemq-cpp/src/test-integration/activemq/util/IntegrationCommon.cpp
@@ -30,7 +30,7 @@ IntegrationCommon::IntegrationCommon() : urlCommon(), stompURL(), openwireURL()
this->urlCommon = "tcp://localhost:";
this->stompURL = this->urlCommon + "61613?wireFormat=stomp";
- this->openwireURL = this->urlCommon + "61616?wireFormat=openwire";
+ this->openwireURL = this->urlCommon + "61616?transport.trace=false";
}
////////////////////////////////////////////////////////////////////////////////