You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2008/11/10 02:09:40 UTC

svn commit: r712590 - in /activemq/activemq-cpp/trunk/src: main/activemq/connector/openwire/ main/activemq/transport/ test/activemq/transport/ test/activemq/transport/filters/

Author: tabish
Date: Sun Nov  9 17:09:40 2008
New Revision: 712590

URL: http://svn.apache.org/viewvc?rev=712590&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-200

Add a way for the OpenWireFormatNegotiator to drill down to the IOTransport to circumvent and other transport filters so that thread safety is ensure on the initial WireFormatInfo command send.

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
    activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp Sun Nov  9 17:09:40 2008
@@ -19,6 +19,7 @@
 
 #include <activemq/connector/openwire/commands/DataStructure.h>
 #include <activemq/connector/openwire/commands/WireFormatInfo.h>
+#include <activemq/transport/IOTransport.h>
 
 using namespace std;
 using namespace activemq;
@@ -228,8 +229,18 @@
 
         try {
 
+            // Circumvent all other Transport filters and go straight for the base
+            // IOTransport, this should guarantee that there's no funny business done
+            // like async dispatch etc.  If it can't be found just use next and hope that
+            // there's nothing that will break the necessary thread locking that protects
+            // the message as it marshaled out to the wire
+            Transport* transport = this->next->narrow( typeid( transport::IOTransport ) );
+            if( transport == NULL ) {
+                transport = this->next;
+            }
+
             // We first send the WireFormat that we'd prefer.
-            next->oneway( openWireFormat->getPreferedWireFormatInfo() );
+            transport->oneway( openWireFormat->getPreferedWireFormatInfo() );
 
             // Mark the latch
             wireInfoSentDownLatch.countDown();

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h Sun Nov  9 17:09:40 2008
@@ -240,6 +240,22 @@
          */
         virtual void run();
 
+        /**
+         * Narrows down a Chain of Transports to a specific Transport to allow a
+         * higher level transport to skip intermediate Transports in certain
+         * circumstances.
+         *
+         * @param typeId - The type_info of the Object we are searching for.
+         *
+         * @return the requested Object. or NULL if its not in this chain.
+         */
+        virtual Transport* narrow( const std::type_info& typeId ) {
+            if( typeid( *this ) == typeId ) {
+                return this;
+            }
+
+            return NULL;
+        }
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h Sun Nov  9 17:09:40 2008
@@ -268,6 +268,23 @@
         virtual void start() throw (cms::CMSException){}
         virtual void close() throw (cms::CMSException){}
 
+        /**
+         * Narrows down a Chain of Transports to a specific Transport to allow a
+         * higher level transport to skip intermediate Transports in certain
+         * circumstances.
+         *
+         * @param typeId - The type_info of the Object we are searching for.
+         *
+         * @return the requested Object. or NULL if its not in this chain.
+         */
+        virtual Transport* narrow( const std::type_info& typeId ) {
+            if( typeid( *this ) == typeId ) {
+                return this;
+            }
+
+            return NULL;
+        }
+
     protected:
 
         unsigned int getNextCommandId() throw( exceptions::ActiveMQException );

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h Sun Nov  9 17:09:40 2008
@@ -27,6 +27,7 @@
 #include <activemq/transport/Response.h>
 #include <cms/Startable.h>
 #include <cms/Closeable.h>
+#include <typeinfo>
 
 namespace activemq{
 namespace transport{
@@ -119,6 +120,17 @@
         virtual void setTransportExceptionListener(
             TransportExceptionListener* listener ) = 0;
 
+        /**
+         * Narrows down a Chain of Transports to a specific Transport to allow a
+         * higher level transport to skip intermediate Transports in certain
+         * circumstances.
+         *
+         * @param typeId - The type_info of the Object we are searching for.
+         *
+         * @return the requested Object. or NULL if its not in this chain.
+         */
+        virtual Transport* narrow( const std::type_info& typeId ) = 0;
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h Sun Nov  9 17:09:40 2008
@@ -24,6 +24,7 @@
 #include <activemq/transport/CommandListener.h>
 #include <activemq/transport/Command.h>
 #include <activemq/transport/TransportExceptionListener.h>
+#include <typeinfo>
 
 namespace activemq{
 namespace transport{
@@ -222,6 +223,25 @@
             next->close();
         }
 
+        /**
+         * Narrows down a Chain of Transports to a specific Transport to allow a
+         * higher level transport to skip intermediate Transports in certain
+         * circumstances.
+         *
+         * @param typeId - The type_info of the Object we are searching for.
+         *
+         * @return the requested Object. or NULL if its not in this chain.
+         */
+        virtual Transport* narrow( const std::type_info& typeId ) {
+            if( typeid( *this ) == typeId ) {
+                return this;
+            } else if( this->next != NULL ) {
+                return this->next->narrow( typeId );
+            }
+
+            return NULL;
+        }
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp Sun Nov  9 17:09:40 2008
@@ -423,3 +423,19 @@
 
     transport.close();
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void IOTransportTest::testNarrow(){
+
+    IOTransport transport;
+
+    Transport* narrowed = transport.narrow( typeid( transport ) );
+    CPPUNIT_ASSERT( narrowed == &transport );
+
+    narrowed = transport.narrow( typeid( std::string() ) );
+    CPPUNIT_ASSERT( narrowed == NULL );
+
+    narrowed = transport.narrow( typeid( transport::IOTransport ) );
+    CPPUNIT_ASSERT( narrowed == &transport );
+
+}

Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.h?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.h Sun Nov  9 17:09:40 2008
@@ -33,6 +33,7 @@
         CPPUNIT_TEST( testRead );
         CPPUNIT_TEST( testWrite );
         CPPUNIT_TEST( testException );
+        CPPUNIT_TEST( testNarrow );
         CPPUNIT_TEST_SUITE_END();
 
     public:
@@ -44,6 +45,7 @@
         void testRead();
         void testStartClose();
         void testStressTransportStartClose();
+        void testNarrow();
 
     };
 

Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp Sun Nov  9 17:09:40 2008
@@ -223,3 +223,26 @@
     AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
     AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelatorTest::testNarrow(){
+
+    MyTransport transport;
+    ResponseCorrelator correlator( &transport, false );
+
+    Transport* narrowed = correlator.narrow( typeid( transport ) );
+    CPPUNIT_ASSERT( narrowed == &transport );
+
+    narrowed = correlator.narrow( typeid( std::string() ) );
+    CPPUNIT_ASSERT( narrowed == NULL );
+
+    narrowed = correlator.narrow( typeid( MyTransport ) );
+    CPPUNIT_ASSERT( narrowed == &transport );
+
+    narrowed = correlator.narrow( typeid( transport::filters::ResponseCorrelator ) );
+    CPPUNIT_ASSERT( narrowed == &correlator );
+
+    narrowed = correlator.narrow( typeid( correlator ) );
+    CPPUNIT_ASSERT( narrowed == &correlator );
+
+}

Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h?rev=712590&r1=712589&r2=712590&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h Sun Nov  9 17:09:40 2008
@@ -39,6 +39,7 @@
         CPPUNIT_TEST( testOneway );
         CPPUNIT_TEST( testTransportException );
         CPPUNIT_TEST( testMultiRequests );
+        CPPUNIT_TEST( testNarrow );
         CPPUNIT_TEST_SUITE_END();
 
     public:
@@ -277,6 +278,14 @@
                     }
                 }
             }
+
+            virtual Transport* narrow( const std::type_info& typeId ) {
+                if( typeid( *this ) == typeId ) {
+                    return this;
+                }
+
+                return NULL;
+            }
         };
 
         class MyBrokenTransport : public MyTransport{
@@ -370,6 +379,7 @@
         void testOneway();
         void testTransportException();
         void testMultiRequests();
+        void testNarrow();
 
     };