You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2008/11/10 02:09:40 UTC
svn commit: r712590 - in /activemq/activemq-cpp/trunk/src:
main/activemq/connector/openwire/ main/activemq/transport/
test/activemq/transport/ test/activemq/transport/filters/
Author: tabish
Date: Sun Nov 9 17:09:40 2008
New Revision: 712590
URL: http://svn.apache.org/viewvc?rev=712590&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-200
Add a way for the OpenWireFormatNegotiator to drill down to the IOTransport to circumvent and other transport filters so that thread safety is ensure on the initial WireFormatInfo command send.
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp
activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.h
activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp
activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp Sun Nov 9 17:09:40 2008
@@ -19,6 +19,7 @@
#include <activemq/connector/openwire/commands/DataStructure.h>
#include <activemq/connector/openwire/commands/WireFormatInfo.h>
+#include <activemq/transport/IOTransport.h>
using namespace std;
using namespace activemq;
@@ -228,8 +229,18 @@
try {
+ // Circumvent all other Transport filters and go straight for the base
+ // IOTransport, this should guarantee that there's no funny business done
+ // like async dispatch etc. If it can't be found just use next and hope that
+ // there's nothing that will break the necessary thread locking that protects
+ // the message as it marshaled out to the wire
+ Transport* transport = this->next->narrow( typeid( transport::IOTransport ) );
+ if( transport == NULL ) {
+ transport = this->next;
+ }
+
// We first send the WireFormat that we'd prefer.
- next->oneway( openWireFormat->getPreferedWireFormatInfo() );
+ transport->oneway( openWireFormat->getPreferedWireFormatInfo() );
// Mark the latch
wireInfoSentDownLatch.countDown();
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h Sun Nov 9 17:09:40 2008
@@ -240,6 +240,22 @@
*/
virtual void run();
+ /**
+ * Narrows down a Chain of Transports to a specific Transport to allow a
+ * higher level transport to skip intermediate Transports in certain
+ * circumstances.
+ *
+ * @param typeId - The type_info of the Object we are searching for.
+ *
+ * @return the requested Object. or NULL if its not in this chain.
+ */
+ virtual Transport* narrow( const std::type_info& typeId ) {
+ if( typeid( *this ) == typeId ) {
+ return this;
+ }
+
+ return NULL;
+ }
};
}}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h Sun Nov 9 17:09:40 2008
@@ -268,6 +268,23 @@
virtual void start() throw (cms::CMSException){}
virtual void close() throw (cms::CMSException){}
+ /**
+ * Narrows down a Chain of Transports to a specific Transport to allow a
+ * higher level transport to skip intermediate Transports in certain
+ * circumstances.
+ *
+ * @param typeId - The type_info of the Object we are searching for.
+ *
+ * @return the requested Object. or NULL if its not in this chain.
+ */
+ virtual Transport* narrow( const std::type_info& typeId ) {
+ if( typeid( *this ) == typeId ) {
+ return this;
+ }
+
+ return NULL;
+ }
+
protected:
unsigned int getNextCommandId() throw( exceptions::ActiveMQException );
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h Sun Nov 9 17:09:40 2008
@@ -27,6 +27,7 @@
#include <activemq/transport/Response.h>
#include <cms/Startable.h>
#include <cms/Closeable.h>
+#include <typeinfo>
namespace activemq{
namespace transport{
@@ -119,6 +120,17 @@
virtual void setTransportExceptionListener(
TransportExceptionListener* listener ) = 0;
+ /**
+ * Narrows down a Chain of Transports to a specific Transport to allow a
+ * higher level transport to skip intermediate Transports in certain
+ * circumstances.
+ *
+ * @param typeId - The type_info of the Object we are searching for.
+ *
+ * @return the requested Object. or NULL if its not in this chain.
+ */
+ virtual Transport* narrow( const std::type_info& typeId ) = 0;
+
};
}}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h Sun Nov 9 17:09:40 2008
@@ -24,6 +24,7 @@
#include <activemq/transport/CommandListener.h>
#include <activemq/transport/Command.h>
#include <activemq/transport/TransportExceptionListener.h>
+#include <typeinfo>
namespace activemq{
namespace transport{
@@ -222,6 +223,25 @@
next->close();
}
+ /**
+ * Narrows down a Chain of Transports to a specific Transport to allow a
+ * higher level transport to skip intermediate Transports in certain
+ * circumstances.
+ *
+ * @param typeId - The type_info of the Object we are searching for.
+ *
+ * @return the requested Object. or NULL if its not in this chain.
+ */
+ virtual Transport* narrow( const std::type_info& typeId ) {
+ if( typeid( *this ) == typeId ) {
+ return this;
+ } else if( this->next != NULL ) {
+ return this->next->narrow( typeId );
+ }
+
+ return NULL;
+ }
+
};
}}
Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp Sun Nov 9 17:09:40 2008
@@ -423,3 +423,19 @@
transport.close();
}
+
+////////////////////////////////////////////////////////////////////////////////
+void IOTransportTest::testNarrow(){
+
+ IOTransport transport;
+
+ Transport* narrowed = transport.narrow( typeid( transport ) );
+ CPPUNIT_ASSERT( narrowed == &transport );
+
+ narrowed = transport.narrow( typeid( std::string() ) );
+ CPPUNIT_ASSERT( narrowed == NULL );
+
+ narrowed = transport.narrow( typeid( transport::IOTransport ) );
+ CPPUNIT_ASSERT( narrowed == &transport );
+
+}
Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.h?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.h Sun Nov 9 17:09:40 2008
@@ -33,6 +33,7 @@
CPPUNIT_TEST( testRead );
CPPUNIT_TEST( testWrite );
CPPUNIT_TEST( testException );
+ CPPUNIT_TEST( testNarrow );
CPPUNIT_TEST_SUITE_END();
public:
@@ -44,6 +45,7 @@
void testRead();
void testStartClose();
void testStressTransportStartClose();
+ void testNarrow();
};
Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp Sun Nov 9 17:09:40 2008
@@ -223,3 +223,26 @@
AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelatorTest::testNarrow(){
+
+ MyTransport transport;
+ ResponseCorrelator correlator( &transport, false );
+
+ Transport* narrowed = correlator.narrow( typeid( transport ) );
+ CPPUNIT_ASSERT( narrowed == &transport );
+
+ narrowed = correlator.narrow( typeid( std::string() ) );
+ CPPUNIT_ASSERT( narrowed == NULL );
+
+ narrowed = correlator.narrow( typeid( MyTransport ) );
+ CPPUNIT_ASSERT( narrowed == &transport );
+
+ narrowed = correlator.narrow( typeid( transport::filters::ResponseCorrelator ) );
+ CPPUNIT_ASSERT( narrowed == &correlator );
+
+ narrowed = correlator.narrow( typeid( correlator ) );
+ CPPUNIT_ASSERT( narrowed == &correlator );
+
+}
Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h Sun Nov 9 17:09:40 2008
@@ -39,6 +39,7 @@
CPPUNIT_TEST( testOneway );
CPPUNIT_TEST( testTransportException );
CPPUNIT_TEST( testMultiRequests );
+ CPPUNIT_TEST( testNarrow );
CPPUNIT_TEST_SUITE_END();
public:
@@ -277,6 +278,14 @@
}
}
}
+
+ virtual Transport* narrow( const std::type_info& typeId ) {
+ if( typeid( *this ) == typeId ) {
+ return this;
+ }
+
+ return NULL;
+ }
};
class MyBrokenTransport : public MyTransport{
@@ -370,6 +379,7 @@
void testOneway();
void testTransportException();
void testMultiRequests();
+ void testNarrow();
};