You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2007/06/15 22:02:40 UTC
svn commit: r547774 - in
/activemq/activemq-cpp/trunk/src/main/activemq/connector:
openwire/OpenWireFormatNegotiator.cpp stomp/StompConnector.cpp
Author: tabish
Date: Fri Jun 15 13:02:39 2007
New Revision: 547774
URL: http://svn.apache.org/viewvc?view=rev&rev=547774
Log:
https://issues.apache.org/activemq/browse/AMQCPP-131
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp?view=diff&rev=547774&r1=547773&r2=547774
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp Fri Jun 15 13:02:39 2007
@@ -1,232 +1,232 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "OpenWireFormatNegotiator.h"
-
-#include <activemq/connector/openwire/commands/DataStructure.h>
-#include <activemq/connector/openwire/commands/WireFormatInfo.h>
-
-using namespace std;
-using namespace activemq;
-using namespace activemq::exceptions;
-using namespace activemq::transport;
-using namespace activemq::concurrent;
-using namespace activemq::connector;
-using namespace activemq::connector::openwire;
-using namespace activemq::connector::openwire::commands;
-
-////////////////////////////////////////////////////////////////////////////////
-OpenWireFormatNegotiator::OpenWireFormatNegotiator( OpenWireFormat* openWireFormat,
- Transport* next,
- bool own ) :
- TransportFilter( next, own ),
- wireInfoSentDownLatch(1),
- readyCountDownLatch(1)
-{
- this->firstTime = true;
- this->openWireFormat = openWireFormat;
- this->closed = true;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-OpenWireFormatNegotiator::~OpenWireFormatNegotiator()
-{
- // Close the transport and destroy it.
- close();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormatNegotiator::oneway( Command* command )
- throw( CommandIOException, exceptions::UnsupportedOperationException ) {
-
- try{
-
- if( closed || next == NULL ){
- throw CommandIOException(
- __FILE__, __LINE__,
- "OpenWireFormatNegotiator::oneway - transport already closed" );
- }
-
- if( !readyCountDownLatch.await( negotiationTimeout ) ) {
- throw CommandIOException(
- __FILE__,
- __LINE__,
- "OpenWireFormatNegotiator::oneway"
- "Wire format negociation timeout: peer did not "
- "send his wire format." );
- }
-
- next->oneway( command );
- }
- AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
- AMQ_CATCH_RETHROW( CommandIOException )
- AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
- AMQ_CATCHALL_THROW( CommandIOException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-Response* OpenWireFormatNegotiator::request( Command* command )
- throw( CommandIOException, exceptions::UnsupportedOperationException ) {
-
- try{
-
- if( closed || next == NULL ){
- throw CommandIOException(
- __FILE__, __LINE__,
- "OpenWireFormatNegotiator::request - transport already closed" );
- }
-
- if( !readyCountDownLatch.await( negotiationTimeout ) ) {
- throw CommandIOException(
- __FILE__,
- __LINE__,
- "OpenWireFormatNegotiator::request"
- "Wire format negociation timeout: peer did not "
- "send his wire format." );
- }
-
- return next->request( command );
- }
- AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
- AMQ_CATCH_RETHROW( CommandIOException )
- AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
- AMQ_CATCHALL_THROW( CommandIOException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormatNegotiator::onCommand( Command* command ) {
-
- DataStructure* dataStructure =
- dynamic_cast<DataStructure*>( command );
-
- if( dataStructure != NULL &&
- dataStructure->getDataStructureType() == WireFormatInfo::ID_WIREFORMATINFO ) {
-
- WireFormatInfo* info = dynamic_cast<WireFormatInfo*>( dataStructure );
-
- try {
-
- if( !info->isValid() ) {
- throw CommandIOException(
- __FILE__,
- __LINE__,
- "OpenWireFormatNegotiator::onCommand"
- "Remote wire format magic is invalid" );
- }
-
- wireInfoSentDownLatch.await( negotiationTimeout );
- openWireFormat->renegotiateWireFormat( info );
-
- readyCountDownLatch.countDown();
-
- } catch( exceptions::ActiveMQException& ex ) {
-
- readyCountDownLatch.countDown();
- fire( ex );
- }
- }
-
- // Send along to the next interested party.
- fire( command );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormatNegotiator::onTransportException(
- Transport* source AMQCPP_UNUSED,
- const exceptions::ActiveMQException& ex ) {
-
- readyCountDownLatch.countDown();
- fire( ex );
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormatNegotiator::start() throw( cms::CMSException ){
-
- /**
- * We're already started.
- */
- if( !closed ){
- return;
- }
-
- if( commandlistener == NULL ){
- throw exceptions::ActiveMQException(
- __FILE__, __LINE__,
- "OpenWireFormatNegotiator::start - "
- "commandListener is invalid" );
- }
-
- if( exceptionListener == NULL ){
- throw exceptions::ActiveMQException(
- __FILE__, __LINE__,
- "OpenWireFormatNegotiator::start - "
- "exceptionListener is invalid" );
- }
-
- if( next == NULL ){
- throw exceptions::ActiveMQException(
- __FILE__, __LINE__,
- "OpenWireFormatNegotiator::start - "
- "next transport is NULL" );
- }
-
- if( openWireFormat == NULL ){
- throw exceptions::ActiveMQException(
- __FILE__, __LINE__,
- "OpenWireFormatNegotiator::start - "
- "openWireFormat is NULL" );
- }
-
- // Start the delegate transport object.
- next->start();
-
- if( firstTime == true ) {
-
- try {
-
- // The First Time is now over with
- firstTime = false;
-
- // We first send the WireFormat that we'd prefer.
- next->oneway( openWireFormat->getPreferedWireFormatInfo() );
-
- // Mark the latch
- wireInfoSentDownLatch.countDown();
-
- } catch( ActiveMQException& ex ) {
-
- // Mark the latch
- wireInfoSentDownLatch.countDown();
- ex.setMark( __FILE__, __LINE__ );
- throw ex;
- }
- }
-
- // Mark it as open.
- closed = false;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormatNegotiator::close() throw( cms::CMSException ){
-
- if( !closed && next != NULL ){
- next->close();
- }
-
- closed = true;
-}
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenWireFormatNegotiator.h"
+
+#include <activemq/connector/openwire/commands/DataStructure.h>
+#include <activemq/connector/openwire/commands/WireFormatInfo.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::exceptions;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+using namespace activemq::connector;
+using namespace activemq::connector::openwire;
+using namespace activemq::connector::openwire::commands;
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireFormatNegotiator::OpenWireFormatNegotiator( OpenWireFormat* openWireFormat,
+ Transport* next,
+ bool own ) :
+ TransportFilter( next, own ),
+ wireInfoSentDownLatch(1),
+ readyCountDownLatch(1)
+{
+ this->firstTime = true;
+ this->openWireFormat = openWireFormat;
+ this->closed = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireFormatNegotiator::~OpenWireFormatNegotiator()
+{
+ // Close the transport and destroy it.
+ close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::oneway( Command* command )
+ throw( CommandIOException, exceptions::UnsupportedOperationException ) {
+
+ try{
+
+ if( closed || next == NULL ){
+ throw CommandIOException(
+ __FILE__, __LINE__,
+ "OpenWireFormatNegotiator::oneway - transport already closed" );
+ }
+
+ if( !readyCountDownLatch.await( negotiationTimeout ) ) {
+ throw CommandIOException(
+ __FILE__,
+ __LINE__,
+ "OpenWireFormatNegotiator::oneway"
+ "Wire format negotiation timeout: peer did not "
+ "send his wire format." );
+ }
+
+ next->oneway( command );
+ }
+ AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
+ AMQ_CATCH_RETHROW( CommandIOException )
+ AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
+ AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Response* OpenWireFormatNegotiator::request( Command* command )
+ throw( CommandIOException, exceptions::UnsupportedOperationException ) {
+
+ try{
+
+ if( closed || next == NULL ){
+ throw CommandIOException(
+ __FILE__, __LINE__,
+ "OpenWireFormatNegotiator::request - transport already closed" );
+ }
+
+ if( !readyCountDownLatch.await( negotiationTimeout ) ) {
+ throw CommandIOException(
+ __FILE__,
+ __LINE__,
+ "OpenWireFormatNegotiator::request"
+ "Wire format negotiation timeout: peer did not "
+ "send his wire format." );
+ }
+
+ return next->request( command );
+ }
+ AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
+ AMQ_CATCH_RETHROW( CommandIOException )
+ AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
+ AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::onCommand( Command* command ) {
+
+ DataStructure* dataStructure =
+ dynamic_cast<DataStructure*>( command );
+
+ if( dataStructure != NULL &&
+ dataStructure->getDataStructureType() == WireFormatInfo::ID_WIREFORMATINFO ) {
+
+ WireFormatInfo* info = dynamic_cast<WireFormatInfo*>( dataStructure );
+
+ try {
+
+ if( !info->isValid() ) {
+ throw CommandIOException(
+ __FILE__,
+ __LINE__,
+ "OpenWireFormatNegotiator::onCommand"
+ "Remote wire format magic is invalid" );
+ }
+
+ wireInfoSentDownLatch.await( negotiationTimeout );
+ openWireFormat->renegotiateWireFormat( info );
+
+ readyCountDownLatch.countDown();
+
+ } catch( exceptions::ActiveMQException& ex ) {
+
+ readyCountDownLatch.countDown();
+ fire( ex );
+ }
+ }
+
+ // Send along to the next interested party.
+ fire( command );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::onTransportException(
+ Transport* source AMQCPP_UNUSED,
+ const exceptions::ActiveMQException& ex ) {
+
+ readyCountDownLatch.countDown();
+ fire( ex );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::start() throw( cms::CMSException ){
+
+ /**
+ * We're already started.
+ */
+ if( !closed ){
+ return;
+ }
+
+ if( commandlistener == NULL ){
+ throw exceptions::ActiveMQException(
+ __FILE__, __LINE__,
+ "OpenWireFormatNegotiator::start - "
+ "commandListener is invalid" );
+ }
+
+ if( exceptionListener == NULL ){
+ throw exceptions::ActiveMQException(
+ __FILE__, __LINE__,
+ "OpenWireFormatNegotiator::start - "
+ "exceptionListener is invalid" );
+ }
+
+ if( next == NULL ){
+ throw exceptions::ActiveMQException(
+ __FILE__, __LINE__,
+ "OpenWireFormatNegotiator::start - "
+ "next transport is NULL" );
+ }
+
+ if( openWireFormat == NULL ){
+ throw exceptions::ActiveMQException(
+ __FILE__, __LINE__,
+ "OpenWireFormatNegotiator::start - "
+ "openWireFormat is NULL" );
+ }
+
+ // Start the delegate transport object.
+ next->start();
+
+ if( firstTime == true ) {
+
+ try {
+
+ // The First Time is now over with
+ firstTime = false;
+
+ // We first send the WireFormat that we'd prefer.
+ next->oneway( openWireFormat->getPreferedWireFormatInfo() );
+
+ // Mark the latch
+ wireInfoSentDownLatch.countDown();
+
+ } catch( ActiveMQException& ex ) {
+
+ // Mark the latch
+ wireInfoSentDownLatch.countDown();
+ ex.setMark( __FILE__, __LINE__ );
+ throw ex;
+ }
+ }
+
+ // Mark it as open.
+ closed = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::close() throw( cms::CMSException ){
+
+ if( !closed && next != NULL ){
+ next->close();
+ }
+
+ closed = true;
+}
+
Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp?view=diff&rev=547774&r1=547773&r2=547774
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp Fri Jun 15 13:02:39 2007
@@ -413,6 +413,7 @@
"StompConnector::createTemporaryTopic - No Stomp Support");
}
AMQ_CATCH_RETHROW( ConnectorException )
+ AMQ_CATCH_RETHROW( UnsupportedOperationException )
AMQ_CATCHALL_THROW( ConnectorException );
return NULL;
}
@@ -429,6 +430,7 @@
"StompConnector::createTemporaryQueue - No Stomp Support");
}
AMQ_CATCH_RETHROW( ConnectorException )
+ AMQ_CATCH_RETHROW( UnsupportedOperationException )
AMQ_CATCHALL_THROW( ConnectorException );
return NULL;
}
@@ -716,9 +718,10 @@
{
throw UnsupportedOperationException(
__FILE__, __LINE__,
- "StompConnector::createTemporaryQueue - No Stomp Support");
+ "StompConnector::createMapMessage - No Stomp Support");
}
AMQ_CATCH_RETHROW( ConnectorException )
+ AMQ_CATCH_RETHROW( UnsupportedOperationException )
AMQ_CATCHALL_THROW( ConnectorException );
return NULL;
}
@@ -731,9 +734,10 @@
{
throw UnsupportedOperationException(
__FILE__, __LINE__,
- "StompConnector::createTemporaryQueue - No Stomp Support");
+ "StompConnector::unsubscribe - No Stomp Support");
}
AMQ_CATCH_RETHROW( ConnectorException )
+ AMQ_CATCH_RETHROW( UnsupportedOperationException )
AMQ_CATCHALL_THROW( ConnectorException );
}