You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/04/21 17:11:32 UTC

svn commit: r1095745 - in /activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src: main/activemq/state/ main/activemq/transport/ main/activemq/transport/failover/ test/activemq/transport/correlator/

Author: tabish
Date: Thu Apr 21 15:11:32 2011
New Revision: 1095745

URL: http://svn.apache.org/viewvc?rev=1095745&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-364

Modified:
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.h
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/Transport.h
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.h
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
    activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp?rev=1095745&r1=1095744&r2=1095745&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp Thu Apr 21 15:11:32 2011
@@ -24,6 +24,7 @@
 #include <activemq/commands/RemoveInfo.h>
 #include <activemq/core/ActiveMQConstants.h>
 #include <activemq/transport/TransportListener.h>
+#include <activemq/wireformat/WireFormat.h>
 
 using namespace activemq;
 using namespace activemq::core;
@@ -175,10 +176,6 @@ void ConnectionStateTracker::doRestoreTr
 
         for( ; iter != transactionStates.end(); ++iter ) {
 
-            //if( LOG.isDebugEnabled() ) {
-            //    LOG.debug("tx: " + transactionState.getId());
-            //}
-
             // ignore any empty (ack) transaction
             if( (*iter)->getCommands().size() == 2 ) {
                 Pointer<Command> lastCommand = (*iter)->getCommands().get(1);
@@ -186,9 +183,6 @@ void ConnectionStateTracker::doRestoreTr
                     Pointer<TransactionInfo> transactionInfo = lastCommand.dynamicCast<TransactionInfo>();
 
                     if( transactionInfo->getType() == ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE ) {
-                        //if( LOG.isDebugEnabled() ) {
-                        //    LOG.debug("not replaying empty (ack) tx: " + transactionState.getId());
-                        //}
                         toIgnore.push_back(lastCommand);
                         continue;
                     }
@@ -200,9 +194,6 @@ void ConnectionStateTracker::doRestoreTr
             std::vector< Pointer<ProducerState> >::const_iterator state = producerStates.begin();
 
             for( ; state != producerStates.end(); ++state ) {
-                //if( LOG.isDebugEnabled() ) {
-                //    LOG.debug("tx replay producer :" + producerState.getInfo());
-                //}
                 transport->oneway( (*state)->getInfo() );
             }
 
@@ -215,9 +206,6 @@ void ConnectionStateTracker::doRestoreTr
 
             state = producerStates.begin();
             for( ; state != producerStates.end(); ++state ) {
-                //if( LOG.isDebugEnabled() ) {
-                //    LOG.debug("tx remove replayed producer :" + producerState.getInfo());
-                //}
                 transport->oneway( (*state)->getInfo()->createRemoveCommand() );
             }
         }
@@ -282,8 +270,9 @@ void ConnectionStateTracker::doRestoreCo
         for( ; state != consumerStates.end(); ++state ) {
 
             Pointer<ConsumerInfo> infoToSend = (*state)->getInfo();
+            Pointer<wireformat::WireFormat> wireFormat = transport->getWireFormat();
 
-            if( !connectionInterruptionProcessingComplete && infoToSend->getPrefetchSize() > 0) {
+            if( !connectionInterruptionProcessingComplete && infoToSend->getPrefetchSize() > 0 && wireFormat->getVersion() > 5) {
 
                 infoToSend.reset( (*state)->getInfo()->cloneDataStructure() );
                 connectionState->getRecoveringPullConsumers().put( infoToSend->getConsumerId(), (*state)->getInfo() );

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.h?rev=1095745&r1=1095744&r2=1095745&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/IOTransport.h Thu Apr 21 15:11:32 2011
@@ -164,6 +164,10 @@ namespace transport{
          */
         virtual Pointer<Response> request( const Pointer<Command>& command, unsigned int timeout );
 
+        virtual Pointer<wireformat::WireFormat> getWireFormat() const {
+        	return this->wireFormat;
+        }
+
         virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat ){
             this->wireFormat = wireFormat;
         }

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/Transport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/Transport.h?rev=1095745&r1=1095744&r2=1095745&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/Transport.h (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/Transport.h Thu Apr 21 15:11:32 2011
@@ -117,6 +117,15 @@ namespace transport{
         virtual Pointer<Response> request( const Pointer<Command>& command, unsigned int timeout ) = 0;
 
         /**
+         * Gets the WireFormat instance that is in use by this transport.  In the case of
+         * nested transport this method delegates down to the lowest level transport that
+         * actually maintains a WireFormat info instance.
+         *
+         * @returns The WireFormat the object used to encode / decode commands.
+         */
+        virtual Pointer<wireformat::WireFormat> getWireFormat() const = 0;
+
+        /**
          * Sets the WireFormat instance to use.
          * @param wireFormat
          *      The WireFormat the object used to encode / decode commands.

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.h?rev=1095745&r1=1095744&r2=1095745&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.h (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/TransportFilter.h Thu Apr 21 15:11:32 2011
@@ -129,6 +129,10 @@ namespace transport{
             return this->listener;
         }
 
+        virtual Pointer<wireformat::WireFormat> getWireFormat() const {
+        	return next->getWireFormat();
+        }
+
         virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat ) {
             next->setWireFormat( wireFormat );
         }

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1095745&r1=1095744&r2=1095745&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp Thu Apr 21 15:11:32 2011
@@ -917,6 +917,19 @@ void FailoverTransport::processResponse(
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+Pointer<wireformat::WireFormat> FailoverTransport::getWireFormat() const {
+
+	Pointer<wireformat::WireFormat> result;
+	Pointer<Transport> transport = this->connectedTransport;
+
+    if( transport != NULL ) {
+        result = transport->getWireFormat();
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 long long FailoverTransport::getTimeout() const {
     return this->timeout;
 }

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=1095745&r1=1095744&r2=1095745&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h Thu Apr 21 15:11:32 2011
@@ -145,6 +145,8 @@ namespace failover {
 
         virtual Pointer<Response> request( const Pointer<Command>& command, unsigned int timeout );
 
+        virtual Pointer<wireformat::WireFormat> getWireFormat() const;
+
         virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat AMQCPP_UNUSED ) {}
 
         virtual void setTransportListener( TransportListener* listener );

Modified: activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp?rev=1095745&r1=1095744&r2=1095745&view=diff
==============================================================================
--- activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp (original)
+++ activemq/activemq-cpp/branches/activemq-cpp-3.4.x/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp Thu Apr 21 15:11:32 2011
@@ -105,6 +105,10 @@ namespace correlator{
                 __FILE__, __LINE__, "stuff" );
         }
 
+        virtual Pointer<wireformat::WireFormat> getWireFormat() const {
+        	return Pointer<wireformat::WireFormat>();
+        }
+
         virtual void setWireFormat(
             const Pointer<wireformat::WireFormat>& wireFormat AMQCPP_UNUSED ) {}