You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/21 17:11:32 UTC
svn commit: r1095745 - in
/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src:
main/activemq/state/ main/activemq/transport/
main/activemq/transport/failover/ test/activemq/transport/correlator/
Author: tabish
Date: Thu Apr 21 15:11:32 2011
New Revision: 1095745
URL: http://svn.apache.org/viewvc?rev=1095745&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-364
Modified:
activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.h
activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/Transport.h
activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.h
activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp?rev=1095745&r1=1095744&r2=1095745&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp Thu Apr 21 15:11:32 2011
@@ -24,6 +24,7 @@
#include <activemq/commands/RemoveInfo.h>
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/transport/TransportListener.h>
+#include <activemq/wireformat/WireFormat.h>
using namespace activemq;
using namespace activemq::core;
@@ -175,10 +176,6 @@ void ConnectionStateTracker::doRestoreTr
for( ; iter != transactionStates.end(); ++iter ) {
- //if( LOG.isDebugEnabled() ) {
- // LOG.debug("tx: " + transactionState.getId());
- //}
-
// ignore any empty (ack) transaction
if( (*iter)->getCommands().size() == 2 ) {
Pointer<Command> lastCommand = (*iter)->getCommands().get(1);
@@ -186,9 +183,6 @@ void ConnectionStateTracker::doRestoreTr
Pointer<TransactionInfo> transactionInfo = lastCommand.dynamicCast<TransactionInfo>();
if( transactionInfo->getType() == ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE ) {
- //if( LOG.isDebugEnabled() ) {
- // LOG.debug("not replaying empty (ack) tx: " + transactionState.getId());
- //}
toIgnore.push_back(lastCommand);
continue;
}
@@ -200,9 +194,6 @@ void ConnectionStateTracker::doRestoreTr
std::vector< Pointer<ProducerState> >::const_iterator state = producerStates.begin();
for( ; state != producerStates.end(); ++state ) {
- //if( LOG.isDebugEnabled() ) {
- // LOG.debug("tx replay producer :" + producerState.getInfo());
- //}
transport->oneway( (*state)->getInfo() );
}
@@ -215,9 +206,6 @@ void ConnectionStateTracker::doRestoreTr
state = producerStates.begin();
for( ; state != producerStates.end(); ++state ) {
- //if( LOG.isDebugEnabled() ) {
- // LOG.debug("tx remove replayed producer :" + producerState.getInfo());
- //}
transport->oneway( (*state)->getInfo()->createRemoveCommand() );
}
}
@@ -282,8 +270,9 @@ void ConnectionStateTracker::doRestoreCo
for( ; state != consumerStates.end(); ++state ) {
Pointer<ConsumerInfo> infoToSend = (*state)->getInfo();
+ Pointer<wireformat::WireFormat> wireFormat = transport->getWireFormat();
- if( !connectionInterruptionProcessingComplete && infoToSend->getPrefetchSize() > 0) {
+ if( !connectionInterruptionProcessingComplete && infoToSend->getPrefetchSize() > 0 && wireFormat->getVersion() > 5) {
infoToSend.reset( (*state)->getInfo()->cloneDataStructure() );
connectionState->getRecoveringPullConsumers().put( infoToSend->getConsumerId(), (*state)->getInfo() );
Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.h?rev=1095745&r1=1095744&r2=1095745&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.h Thu Apr 21 15:11:32 2011
@@ -164,6 +164,10 @@ namespace transport{
*/
virtual Pointer<Response> request( const Pointer<Command>& command, unsigned int timeout );
+ virtual Pointer<wireformat::WireFormat> getWireFormat() const {
+ return this->wireFormat;
+ }
+
virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat ){
this->wireFormat = wireFormat;
}
Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/Transport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/Transport.h?rev=1095745&r1=1095744&r2=1095745&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/Transport.h (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/Transport.h Thu Apr 21 15:11:32 2011
@@ -117,6 +117,15 @@ namespace transport{
virtual Pointer<Response> request( const Pointer<Command>& command, unsigned int timeout ) = 0;
/**
+ * Gets the WireFormat instance that is in use by this transport. In the case of
+ * nested transport this method delegates down to the lowest level transport that
+ * actually maintains a WireFormat info instance.
+ *
+ * @returns The WireFormat the object used to encode / decode commands.
+ */
+ virtual Pointer<wireformat::WireFormat> getWireFormat() const = 0;
+
+ /**
* Sets the WireFormat instance to use.
* @param wireFormat
* The WireFormat the object used to encode / decode commands.
Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.h?rev=1095745&r1=1095744&r2=1095745&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.h (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.h Thu Apr 21 15:11:32 2011
@@ -129,6 +129,10 @@ namespace transport{
return this->listener;
}
+ virtual Pointer<wireformat::WireFormat> getWireFormat() const {
+ return next->getWireFormat();
+ }
+
virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat ) {
next->setWireFormat( wireFormat );
}
Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1095745&r1=1095744&r2=1095745&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Thu Apr 21 15:11:32 2011
@@ -917,6 +917,19 @@ void FailoverTransport::processResponse(
}
////////////////////////////////////////////////////////////////////////////////
+Pointer<wireformat::WireFormat> FailoverTransport::getWireFormat() const {
+
+ Pointer<wireformat::WireFormat> result;
+ Pointer<Transport> transport = this->connectedTransport;
+
+ if( transport != NULL ) {
+ result = transport->getWireFormat();
+ }
+
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
long long FailoverTransport::getTimeout() const {
return this->timeout;
}
Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=1095745&r1=1095744&r2=1095745&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h Thu Apr 21 15:11:32 2011
@@ -145,6 +145,8 @@ namespace failover {
virtual Pointer<Response> request( const Pointer<Command>& command, unsigned int timeout );
+ virtual Pointer<wireformat::WireFormat> getWireFormat() const;
+
virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat AMQCPP_UNUSED ) {}
virtual void setTransportListener( TransportListener* listener );
Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp?rev=1095745&r1=1095744&r2=1095745&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp Thu Apr 21 15:11:32 2011
@@ -105,6 +105,10 @@ namespace correlator{
__FILE__, __LINE__, "stuff" );
}
+ virtual Pointer<wireformat::WireFormat> getWireFormat() const {
+ return Pointer<wireformat::WireFormat>();
+ }
+
virtual void setWireFormat(
const Pointer<wireformat::WireFormat>& wireFormat AMQCPP_UNUSED ) {}