You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/12 00:39:49 UTC
svn commit: r1397341 [1/4] - in
/activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/core/
main/activemq/transport/ main/activemq/transport/correlator/
main/activemq/transport/failover/ main/activemq/transport/inactivity/
main/activemq/tr...
Author: tabish
Date: Thu Oct 11 22:39:46 2012
New Revision: 1397341
URL: http://svn.apache.org/viewvc?rev=1397341&view=rev
Log:
work for: https://issues.apache.org/jira/browse/AMQCPP-435
Do some refactoring and add in most of the bits needed for this.
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/DefaultTransportListener.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.h
- copied, changed from r1392570, activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/FutureResponse.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/ResponseBuilder.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/AsyncCallback.cpp (with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/AsyncCallback.h (with props)
Removed:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/FutureResponse.h
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/AbstractTransportFactory.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/AbstractTransportFactory.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/DefaultTransportListener.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactory.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportListener.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/CloseTransportsTask.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/URIPool.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/ReadChecker.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/WriteChecker.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/WriteChecker.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/logging/LoggingTransport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/logging/LoggingTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/InternalCommandListener.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransportFactory.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/MockTransportFactory.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/mock/ResponseBuilder.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransportFactory.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransportFactory.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransportFactory.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransportFactory.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/IOTransportTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/transport/inactivity/InactivityMonitorTest.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Thu Oct 11 22:39:46 2012
@@ -124,7 +124,12 @@ cc_sources = \
activemq/threads/Scheduler.cpp \
activemq/threads/SchedulerTimerTask.cpp \
activemq/transport/AbstractTransportFactory.cpp \
+ activemq/transport/CompositeTransport.cpp \
+ activemq/transport/DefaultTransportListener.cpp \
+ activemq/transport/FutureResponse.cpp \
activemq/transport/IOTransport.cpp \
+ activemq/transport/ResponseCallback.cpp \
+ activemq/transport/Transport.cpp \
activemq/transport/TransportFilter.cpp \
activemq/transport/TransportRegistry.cpp \
activemq/transport/correlator/ResponseCorrelator.cpp \
@@ -142,6 +147,7 @@ cc_sources = \
activemq/transport/mock/InternalCommandListener.cpp \
activemq/transport/mock/MockTransport.cpp \
activemq/transport/mock/MockTransportFactory.cpp \
+ activemq/transport/mock/ResponseBuilder.cpp \
activemq/transport/tcp/SslTransport.cpp \
activemq/transport/tcp/SslTransportFactory.cpp \
activemq/transport/tcp/TcpTransport.cpp \
@@ -241,6 +247,7 @@ cc_sources = \
activemq/wireformat/stomp/StompHelper.cpp \
activemq/wireformat/stomp/StompWireFormat.cpp \
activemq/wireformat/stomp/StompWireFormatFactory.cpp \
+ cms/AsyncCallback.cpp \
cms/BytesMessage.cpp \
cms/CMSException.cpp \
cms/CMSProperties.cpp \
@@ -648,13 +655,14 @@ h_sources = \
activemq/transport/AbstractTransportFactory.h \
activemq/transport/CompositeTransport.h \
activemq/transport/DefaultTransportListener.h \
+ activemq/transport/FutureResponse.h \
activemq/transport/IOTransport.h \
+ activemq/transport/ResponseCallback.h \
activemq/transport/Transport.h \
activemq/transport/TransportFactory.h \
activemq/transport/TransportFilter.h \
activemq/transport/TransportListener.h \
activemq/transport/TransportRegistry.h \
- activemq/transport/correlator/FutureResponse.h \
activemq/transport/correlator/ResponseCorrelator.h \
activemq/transport/failover/BackupTransport.h \
activemq/transport/failover/BackupTransportPool.h \
@@ -777,6 +785,7 @@ h_sources = \
activemq/wireformat/stomp/StompHelper.h \
activemq/wireformat/stomp/StompWireFormat.h \
activemq/wireformat/stomp/StompWireFormatFactory.h \
+ cms/AsyncCallback.h \
cms/BytesMessage.h \
cms/CMSException.h \
cms/CMSProperties.h \
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Thu Oct 11 22:39:46 2012
@@ -949,7 +949,7 @@ void ActiveMQConnection::destroyDestinat
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::onCommand(const Pointer<Command>& command) {
+void ActiveMQConnection::onCommand(const Pointer<Command> command) {
try {
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Thu Oct 11 22:39:46 2012
@@ -574,7 +574,7 @@ namespace core{
* transport.
* @param command the received command object.
*/
- virtual void onCommand(const Pointer<commands::Command>& command);
+ virtual void onCommand(const Pointer<commands::Command> command);
/**
* Event handler for an exception from a command transport.
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/AbstractTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/AbstractTransportFactory.cpp?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/AbstractTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/AbstractTransportFactory.cpp Thu Oct 11 22:39:46 2012
@@ -37,6 +37,11 @@ using namespace decaf::net;
using namespace decaf::util;
////////////////////////////////////////////////////////////////////////////////
+AbstractTransportFactory::~AbstractTransportFactory() {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
Pointer<WireFormat> AbstractTransportFactory::createWireFormat(
const decaf::util::Properties& properties ) {
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/AbstractTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/AbstractTransportFactory.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/AbstractTransportFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/AbstractTransportFactory.h Thu Oct 11 22:39:46 2012
@@ -41,7 +41,7 @@ namespace transport {
class AMQCPP_API AbstractTransportFactory : public TransportFactory {
public:
- virtual ~AbstractTransportFactory() {}
+ virtual ~AbstractTransportFactory();
protected:
@@ -57,7 +57,7 @@ namespace transport {
* @throws NoSuchElementException if the configured WireFormat is not found.
*/
virtual Pointer<wireformat::WireFormat> createWireFormat(
- const decaf::util::Properties& properties );
+ const decaf::util::Properties& properties);
};
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.cpp?rev=1397341&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.cpp Thu Oct 11 22:39:46 2012
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CompositeTransport.h"
+
+using namespace activemq;
+using namespace activemq::transport;
+
+////////////////////////////////////////////////////////////////////////////////
+CompositeTransport::~CompositeTransport() {
+
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/CompositeTransport.h Thu Oct 11 22:39:46 2012
@@ -40,7 +40,7 @@ namespace transport {
class AMQCPP_API CompositeTransport: public activemq::transport::Transport {
public:
- virtual ~CompositeTransport() {}
+ virtual ~CompositeTransport();
/**
* Add a URI to the list of URI's that will represent the set of Transports
@@ -51,7 +51,7 @@ namespace transport {
* @param uris
* The new URI set to add to the set this composite maintains.
*/
- virtual void addURI( bool rebalance, const List<URI>& uris ) = 0;
+ virtual void addURI(bool rebalance, const List<URI>& uris) = 0;
/**
* Remove a URI from the set of URI's that represents the set of Transports
@@ -64,7 +64,7 @@ namespace transport {
* @param uris
* The new URI set to remove to the set this composite maintains.
*/
- virtual void removeURI( bool rebalance, const List<URI>& uris ) = 0;
+ virtual void removeURI(bool rebalance, const List<URI>& uris) = 0;
};
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/DefaultTransportListener.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/DefaultTransportListener.cpp?rev=1397341&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/DefaultTransportListener.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/DefaultTransportListener.cpp Thu Oct 11 22:39:46 2012
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DefaultTransportListener.h"
+
+using namespace activemq;
+using namespace activemq::transport;
+
+////////////////////////////////////////////////////////////////////////////////
+DefaultTransportListener::~DefaultTransportListener() {
+
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/DefaultTransportListener.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/DefaultTransportListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/DefaultTransportListener.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/DefaultTransportListener.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/DefaultTransportListener.h Thu Oct 11 22:39:46 2012
@@ -33,10 +33,10 @@ namespace transport {
* A Utility class that create empty implementations for the TransportListener interface
* so that a subclass only needs to override the one's its interested.
*/
- class AMQCPP_API DefaultTransportListener : public TransportListener {
+ class AMQCPP_API DefaultTransportListener: public TransportListener {
public:
- virtual ~DefaultTransportListener() {}
+ virtual ~DefaultTransportListener();
/**
* Event handler for the receipt of a command. The transport passes
@@ -46,14 +46,14 @@ namespace transport {
*
* @param command the received command object.
*/
- virtual void onCommand( const Pointer<Command>& command AMQCPP_UNUSED ) {}
+ virtual void onCommand(const Pointer<Command> command AMQCPP_UNUSED) {}
/**
* Event handler for an exception from a command transport.
*
* @param ex The exception.
*/
- virtual void onException( const decaf::lang::Exception& ex AMQCPP_UNUSED ) {}
+ virtual void onException(const decaf::lang::Exception& ex AMQCPP_UNUSED) {}
/**
* The transport has suffered an interruption from which it hopes to recover
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.cpp?rev=1397341&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.cpp Thu Oct 11 22:39:46 2012
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FutureResponse.h"
+
+#include <decaf/util/concurrent/Concurrent.h>
+#include <decaf/lang/exceptions/UnsupportedOperationException.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/util/Config.h>
+#include <typeinfo>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace activemq::exceptions;
+using namespace activemq::commands;
+using namespace decaf;
+using namespace decaf::io;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+FutureResponse::FutureResponse() : responseLatch(1), response(), responseCallback() {}
+
+////////////////////////////////////////////////////////////////////////////////
+FutureResponse::FutureResponse(const Pointer<ResponseCallback> responseCallback) :
+ responseLatch(1), response(), responseCallback(responseCallback) {}
+
+////////////////////////////////////////////////////////////////////////////////
+FutureResponse::~FutureResponse() {}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Response> FutureResponse::getResponse() const {
+ try {
+ this->responseLatch.await();
+ return response;
+ } catch (decaf::lang::exceptions::InterruptedException& ex) {
+ decaf::lang::Thread::currentThread()->interrupt();
+ throw decaf::io::InterruptedIOException(__FILE__, __LINE__, "Interrupted while awaiting a response");
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Response> FutureResponse::getResponse() {
+ try {
+ this->responseLatch.await();
+ return response;
+ } catch (decaf::lang::exceptions::InterruptedException& ex) {
+ decaf::lang::Thread::currentThread()->interrupt();
+ throw decaf::io::InterruptedIOException(__FILE__, __LINE__, "Interrupted while awaiting a response");
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Response> FutureResponse::getResponse(unsigned int timeout) const {
+ try {
+ this->responseLatch.await(timeout);
+ return response;
+ } catch (decaf::lang::exceptions::InterruptedException& ex) {
+ throw decaf::io::InterruptedIOException(__FILE__, __LINE__, "Interrupted while awaiting a response");
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Response> FutureResponse::getResponse(unsigned int timeout) {
+ try {
+ this->responseLatch.await(timeout);
+ return response;
+ } catch (decaf::lang::exceptions::InterruptedException& ex) {
+ throw decaf::io::InterruptedIOException(__FILE__, __LINE__, "Interrupted while awaiting a response");
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FutureResponse::setResponse(Pointer<Response> response) {
+ this->response = response;
+ this->responseLatch.countDown();
+ if (responseCallback != NULL) {
+ responseCallback->onComplete(this->response);
+ }
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Copied: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.h (from r1392570, activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/FutureResponse.h)
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.h?p2=activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.h&p1=activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/FutureResponse.h&r1=1392570&r2=1397341&rev=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/FutureResponse.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/FutureResponse.h Thu Oct 11 22:39:46 2012
@@ -15,20 +15,24 @@
* limitations under the License.
*/
-#ifndef _ACTIVEMQ_TRANSPORT_CORRELATOR_FUTURERESPONSE_H_
-#define _ACTIVEMQ_TRANSPORT_CORRELATOR_FUTURERESPONSE_H_
+#ifndef _ACTIVEMQ_TRANSPORT_FUTURERESPONSE_H_
+#define _ACTIVEMQ_TRANSPORT_FUTURERESPONSE_H_
#include <activemq/util/Config.h>
+
+#include <activemq/transport/ResponseCallback.h>
+#include <activemq/commands/Response.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+#include <decaf/lang/Thread.h>
#include <decaf/lang/Pointer.h>
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/util/concurrent/CountDownLatch.h>
-#include <activemq/commands/Response.h>
+#include <decaf/lang/exceptions/InterruptedException.h>
+#include <decaf/io/InterruptedIOException.h>
-#include <activemq/exceptions/ActiveMQException.h>
-
-namespace activemq{
-namespace transport{
-namespace correlator{
+namespace activemq {
+namespace transport {
using decaf::lang::Pointer;
using activemq::commands::Response;
@@ -43,51 +47,47 @@ namespace correlator{
mutable decaf::util::concurrent::CountDownLatch responseLatch;
Pointer<Response> response;
+ Pointer<ResponseCallback> responseCallback;
public:
- FutureResponse() : responseLatch( 1 ), response() {}
+ FutureResponse();
- virtual ~FutureResponse(){}
+ FutureResponse(const Pointer<ResponseCallback> responseCallback);
+
+ virtual ~FutureResponse();
/**
* Getters for the response property. Infinite Wait.
- * @return the response object for the request
+ *
+ * @return the response object for the request.
+ *
+ * @throws InterruptedIOException if the wait for response is interrupted.
*/
- virtual const Pointer<Response>& getResponse() const {
- this->responseLatch.await();
- return response;
- }
- virtual Pointer<Response>& getResponse() {
- this->responseLatch.await();
- return response;
- }
+ Pointer<Response> getResponse() const;
+ Pointer<Response> getResponse();
/**
* Getters for the response property. Timed Wait.
- * @param timeout - time to wait in milliseconds
+ *
+ * @param timeout
+ * Time to wait in milliseconds for a Response.
+ *
* @return the response object for the request
+ *
+ * @throws InterruptedIOException if the wait for response is interrupted.
*/
- virtual const Pointer<Response>& getResponse( unsigned int timeout ) const {
- this->responseLatch.await( timeout );
- return response;
- }
- virtual Pointer<Response>& getResponse( unsigned int timeout ) {
- this->responseLatch.await( timeout );
- return response;
- }
+ Pointer<Response> getResponse(unsigned int timeout) const;
+ Pointer<Response> getResponse(unsigned int timeout);
/**
* Setter for the response property.
* @param response the response object for the request.
*/
- virtual void setResponse( const Pointer<Response>& response ) {
- this->response = response;
- this->responseLatch.countDown();
- }
+ void setResponse(Pointer<Response> response);
};
-}}}
+}}
-#endif /*_ACTIVEMQ_TRANSPORT_CORRELATOR_FUTURERESPONSE_H_*/
+#endif /*_ACTIVEMQ_TRANSPORT_FUTURERESPONSE_H_*/
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp Thu Oct 11 22:39:46 2012
@@ -44,7 +44,7 @@ IOTransport::IOTransport() :
}
////////////////////////////////////////////////////////////////////////////////
-IOTransport::IOTransport(const Pointer<WireFormat>& wireFormat) :
+IOTransport::IOTransport(const Pointer<WireFormat> wireFormat) :
wireFormat(wireFormat), listener(NULL), inputStream(NULL), outputStream(NULL), thread(), closed(false) {
}
@@ -68,7 +68,7 @@ void IOTransport::fire(decaf::lang::Exce
}
////////////////////////////////////////////////////////////////////////////////
-void IOTransport::fire(const Pointer<Command>& command) {
+void IOTransport::fire(const Pointer<Command> command) {
try {
// If we have been closed then we don't deliver any messages that
@@ -84,7 +84,7 @@ void IOTransport::fire(const Pointer<Com
}
////////////////////////////////////////////////////////////////////////////////
-void IOTransport::oneway(const Pointer<Command>& command) {
+void IOTransport::oneway(const Pointer<Command> command) {
try {
@@ -265,11 +265,17 @@ void IOTransport::run() {
}
////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> IOTransport::request(const Pointer<Command>& command AMQCPP_UNUSED) {
+Pointer<FutureResponse> IOTransport::asyncRequest(const Pointer<Command> command AMQCPP_UNUSED,
+ const Pointer<ResponseCallback> responseCallback AMQCPP_UNUSED) {
+ throw UnsupportedOperationException(__FILE__, __LINE__, "IOTransport::asyncRequest() - unsupported operation");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Response> IOTransport::request(const Pointer<Command> command AMQCPP_UNUSED) {
throw UnsupportedOperationException(__FILE__, __LINE__, "IOTransport::request() - unsupported operation");
}
////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> IOTransport::request(const Pointer<Command>& command AMQCPP_UNUSED, unsigned int timeout AMQCPP_UNUSED) {
+Pointer<Response> IOTransport::request(const Pointer<Command> command AMQCPP_UNUSED, unsigned int timeout AMQCPP_UNUSED) {
throw UnsupportedOperationException(__FILE__, __LINE__, "IOTransport::request() - unsupported operation");
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h Thu Oct 11 22:39:46 2012
@@ -41,16 +41,13 @@ namespace transport {
using activemq::commands::Response;
/**
- * Implementation of the Transport interface that performs
- * marshaling of commands to IO streams. This class does not
- * implement the request method, it only handles oneway messages.
- * A thread polls on the input stream for in-coming commands. When
- * a command is received, the command listener is notified. The
- * polling thread is not started until the start method is called.
- * The close method will close the associated streams. Close can
- * be called explicitly by the user, but is also called in the
- * destructor. Once this object has been closed, it cannot be
- * restarted.
+ * Implementation of the Transport interface that performs marshaling of commands
+ * to IO streams. This class does not implement the request method, it only handles
+ * oneway messages. A thread polls on the input stream for in-coming commands. When
+ * a command is received, the command listener is notified. The polling thread is not
+ * started until the start method is called. The close method will close the associated
+ * streams. Close can be called explicitly by the user, but is also called in the
+ * destructor. Once this object has been closed, it cannot be restarted.
*/
class AMQCPP_API IOTransport : public Transport,
public decaf::lang::Runnable {
@@ -106,7 +103,7 @@ namespace transport {
* Notify the command listener.
* @param command the command the send
*/
- void fire(const Pointer<Command>& command);
+ void fire(const Pointer<Command> command);
public:
@@ -122,7 +119,7 @@ namespace transport {
* @param wireFormat
* Data encoder / decoder to use when reading and writing.
*/
- IOTransport(const Pointer<wireformat::WireFormat>& wireFormat);
+ IOTransport(const Pointer<wireformat::WireFormat> wireFormat);
virtual ~IOTransport();
@@ -148,27 +145,35 @@ namespace transport {
public: // Transport methods
- virtual void oneway(const Pointer<Command>& command);
+ virtual void oneway(const Pointer<Command> command);
/**
* {@inheritDoc}
*
* This method always thrown an UnsupportedOperationException.
*/
- virtual Pointer<Response> request(const Pointer<Command>& command);
+ virtual Pointer<FutureResponse> asyncRequest(const Pointer<Command> command,
+ const Pointer<ResponseCallback> responseCallback);
/**
* {@inheritDoc}
*
* This method always thrown an UnsupportedOperationException.
*/
- virtual Pointer<Response> request(const Pointer<Command>& command, unsigned int timeout);
+ virtual Pointer<Response> request(const Pointer<Command> command);
+
+ /**
+ * {@inheritDoc}
+ *
+ * This method always thrown an UnsupportedOperationException.
+ */
+ virtual Pointer<Response> request(const Pointer<Command> command, unsigned int timeout);
virtual Pointer<wireformat::WireFormat> getWireFormat() const {
return this->wireFormat;
}
- virtual void setWireFormat(const Pointer<wireformat::WireFormat>& wireFormat) {
+ virtual void setWireFormat(const Pointer<wireformat::WireFormat> wireFormat) {
this->wireFormat = wireFormat;
}
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp?rev=1397341&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp Thu Oct 11 22:39:46 2012
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ResponseCallback.h"
+
+using namespace activemq;
+using namespace activemq::transport;
+
+////////////////////////////////////////////////////////////////////////////////
+ResponseCallback::~ResponseCallback() {
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h?rev=1397341&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h Thu Oct 11 22:39:46 2012
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TRANSPORT_RESPONSECALLBACK_H_
+#define _ACTIVEMQ_TRANSPORT_RESPONSECALLBACK_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/commands/Response.h>
+
+#include <decaf/lang/Pointer.h>
+
+namespace activemq {
+namespace transport {
+
+ /**
+ * Allows an async send to complete at a later time via a Response event.
+ */
+ class AMQCPP_API ResponseCallback {
+ private:
+
+ ResponseCallback(const ResponseCallback&);
+ ResponseCallback& operator= (const ResponseCallback&);
+
+ public:
+
+ virtual ~ResponseCallback();
+
+ public:
+
+ /**
+ * When an Asynchronous operations completes this event is fired.
+ *
+ * The provided FutureResponse can either contain the result of the operation
+ * or an exception indicating that the operation failed.
+ *
+ * @param response
+ * The result of the asynchronous operation that registered this call-back.
+ */
+ virtual void onComplete(decaf::lang::Pointer<commands::Response> response) = 0;
+
+ };
+
+}}
+
+#endif /* _ACTIVEMQ_TRANSPORT_RESPONSECALLBACK_H_ */
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCallback.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.cpp?rev=1397341&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.cpp Thu Oct 11 22:39:46 2012
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Transport.h"
+
+using namespace activemq;
+using namespace activemq::transport;
+
+////////////////////////////////////////////////////////////////////////////////
+Transport::~Transport() {
+
+}
Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/Transport.h Thu Oct 11 22:39:46 2012
@@ -27,6 +27,8 @@
#include <decaf/lang/Pointer.h>
#include <decaf/lang/exceptions/UnsupportedOperationException.h>
#include <activemq/util/Config.h>
+#include <activemq/transport/ResponseCallback.h>
+#include <activemq/transport/FutureResponse.h>
#include <activemq/commands/Command.h>
#include <activemq/commands/Response.h>
#include <typeinfo>
@@ -54,10 +56,10 @@ namespace transport{
* object when created so that they can turn the built in Commands to /
* from the required wire format encoding.
*/
- class AMQCPP_API Transport : public decaf::io::Closeable {
+ class AMQCPP_API Transport: public decaf::io::Closeable {
public:
- virtual ~Transport() {}
+ virtual ~Transport();
/**
* Starts the Transport, the send methods of a Transport will throw an exception
@@ -85,7 +87,25 @@ namespace transport{
* @throws UnsupportedOperationException if this method is not implemented
* by this transport.
*/
- virtual void oneway( const Pointer<Command>& command ) = 0;
+ virtual void oneway(const Pointer<Command> command) = 0;
+
+ /**
+ * Sends a commands asynchronously, returning a FutureResponse object that the caller
+ * can use to check to find out the response from the broker.
+ *
+ * @param command
+ * The Command object that is to sent out.
+ * @param responseCallback
+ * A callback object that will be notified once a response to the command is received.
+ *
+ * @return A FutureResponse instance that can be queried for the Response to the Command.
+ *
+ * @throws IOException if an exception occurs during the read of the command.
+ * @throws UnsupportedOperationException if this method is not implemented
+ * by this transport.
+ */
+ virtual Pointer<FutureResponse> asyncRequest(const Pointer<Command> command,
+ const Pointer<ResponseCallback> responseCallback) = 0;
/**
* Sends the given command to the broker and then waits for the response.
@@ -98,7 +118,7 @@ namespace transport{
* @throws UnsupportedOperationException if this method is not implemented
* by this transport.
*/
- virtual Pointer<Response> request( const Pointer<Command>& command ) = 0;
+ virtual Pointer<Response> request(const Pointer<Command> command) = 0;
/**
* Sends the given command to the broker and then waits for the response.
@@ -114,7 +134,7 @@ namespace transport{
* @throws UnsupportedOperationException if this method is not implemented
* by this transport.
*/
- virtual Pointer<Response> request( const Pointer<Command>& command, unsigned int timeout ) = 0;
+ virtual Pointer<Response> request(const Pointer<Command> command, unsigned int timeout) = 0;
/**
* Gets the WireFormat instance that is in use by this transport. In the case of
@@ -130,13 +150,13 @@ namespace transport{
* @param wireFormat
* The WireFormat the object used to encode / decode commands.
*/
- virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat ) = 0;
+ virtual void setWireFormat(const Pointer<wireformat::WireFormat> wireFormat) = 0;
/**
* Sets the observer of asynchronous events from this transport.
* @param listener the listener of transport events.
*/
- virtual void setTransportListener( TransportListener* listener ) = 0;
+ virtual void setTransportListener(TransportListener* listener) = 0;
/**
* Gets the observer of asynchronous events from this transport.
@@ -153,7 +173,7 @@ namespace transport{
*
* @return the requested Object. or NULL if its not in this chain.
*/
- virtual Transport* narrow( const std::type_info& typeId ) = 0;
+ virtual Transport* narrow(const std::type_info& typeId) = 0;
/**
* Is this Transport fault tolerant, meaning that it will reconnect to
@@ -200,7 +220,7 @@ namespace transport{
*
* @throws IOException on failure or if reconnect is not supported.
*/
- virtual void reconnect( const decaf::net::URI& uri ) = 0;
+ virtual void reconnect(const decaf::net::URI& uri) = 0;
/**
* Updates the set of URIs the Transport can connect to. If the Transport
@@ -213,7 +233,7 @@ namespace transport{
*
* @throws IOException if an error occurs or updates aren't supported.
*/
- virtual void updateURIs( bool rebalance, const decaf::util::List<decaf::net::URI>& uris ) = 0;
+ virtual void updateURIs(bool rebalance, const decaf::util::List<decaf::net::URI>& uris) = 0;
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactory.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFactory.h Thu Oct 11 22:39:46 2012
@@ -50,7 +50,7 @@ namespace transport{
* @param location - URI location to connect to plus any properties to assign.
* @throws ActiveMQexception if an error occurs
*/
- virtual Pointer<Transport> create( const decaf::net::URI& location ) = 0;
+ virtual Pointer<Transport> create(const decaf::net::URI& location) = 0;
/**
* Creates a slimed down Transport instance which can be used in composite
@@ -58,7 +58,7 @@ namespace transport{
* @param location - URI location to connect to plus any properties to assign.
* @throws ActiveMQexception if an error occurs
*/
- virtual Pointer<Transport> createComposite( const decaf::net::URI& location ) = 0;
+ virtual Pointer<Transport> createComposite(const decaf::net::URI& location) = 0;
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp Thu Oct 11 22:39:46 2012
@@ -26,70 +26,71 @@ using namespace decaf::lang;
using namespace decaf::io;
////////////////////////////////////////////////////////////////////////////////
-TransportFilter::TransportFilter( const Pointer<Transport>& next ) :
- next( next ), listener( NULL ) {
-
+TransportFilter::TransportFilter(const Pointer<Transport> next) : next(next), listener(NULL) {
// Observe the nested transport for events.
- next->setTransportListener( this );
+ next->setTransportListener(this);
}
////////////////////////////////////////////////////////////////////////////////
TransportFilter::~TransportFilter() {
-
}
////////////////////////////////////////////////////////////////////////////////
-void TransportFilter::onCommand( const Pointer<Command>& command ){
- fire( command );
+void TransportFilter::onCommand(const Pointer<Command> command) {
+ fire(command);
}
////////////////////////////////////////////////////////////////////////////////
-void TransportFilter::onException( const decaf::lang::Exception& ex ) {
- fire( ex );
+void TransportFilter::onException(const decaf::lang::Exception& ex) {
+ fire(ex);
}
////////////////////////////////////////////////////////////////////////////////
-void TransportFilter::fire( const decaf::lang::Exception& ex ) {
+void TransportFilter::fire(const decaf::lang::Exception& ex) {
- if( listener != NULL ){
- try{
- listener->onException( ex );
- }catch( ... ){}
+ if (listener != NULL) {
+ try {
+ listener->onException(ex);
+ } catch (...) {
+ }
}
}
////////////////////////////////////////////////////////////////////////////////
-void TransportFilter::fire( const Pointer<Command>& command ) {
- try{
- if( listener != NULL ){
- listener->onCommand( command );
+void TransportFilter::fire(const Pointer<Command> command) {
+ try {
+ if (listener != NULL) {
+ listener->onCommand(command);
}
- }catch( ... ){}
+ } catch (...) {
+ }
}
////////////////////////////////////////////////////////////////////////////////
void TransportFilter::transportInterrupted() {
- try{
- if( listener != NULL ){
+ try {
+ if (listener != NULL) {
listener->transportInterrupted();
}
- }catch( ... ){}
+ } catch (...) {
+ }
}
////////////////////////////////////////////////////////////////////////////////
void TransportFilter::transportResumed() {
- try{
- if( listener != NULL ){
+ try {
+ if (listener != NULL) {
listener->transportResumed();
}
- }catch( ... ){}
+ } catch (...) {
+ }
}
////////////////////////////////////////////////////////////////////////////////
void TransportFilter::start() {
- if( listener == NULL ){
- throw decaf::io::IOException( __FILE__, __LINE__, "exceptionListener is invalid" );
+ if (listener == NULL) {
+ throw decaf::io::IOException(__FILE__, __LINE__, "exceptionListener is invalid");
}
// Start the delegate transport object.
@@ -104,34 +105,34 @@ void TransportFilter::stop() {
////////////////////////////////////////////////////////////////////////////////
void TransportFilter::close() {
- if( next != NULL ) {
+ if (next != NULL) {
next->close();
- next.reset( NULL );
+ next.reset(NULL);
}
listener = NULL;
}
////////////////////////////////////////////////////////////////////////////////
-Transport* TransportFilter::narrow( const std::type_info& typeId ) {
- if( typeid( *this ) == typeId ) {
+Transport* TransportFilter::narrow(const std::type_info& typeId) {
+ if (typeid( *this ) == typeId) {
return this;
- } else if( this->next != NULL ) {
- return this->next->narrow( typeId );
+ } else if (this->next != NULL) {
+ return this->next->narrow(typeId);
}
return NULL;
}
////////////////////////////////////////////////////////////////////////////////
-void TransportFilter::reconnect( const decaf::net::URI& uri ) {
+void TransportFilter::reconnect(const decaf::net::URI& uri) {
- try{
- next->reconnect( uri );
+ try {
+ next->reconnect(uri);
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW( IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
+ AMQ_CATCHALL_THROW( IOException)
}
////////////////////////////////////////////////////////////////////////////////
@@ -140,6 +141,6 @@ Pointer<wireformat::WireFormat> Transpor
}
////////////////////////////////////////////////////////////////////////////////
-void TransportFilter::setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat ) {
- next->setWireFormat( wireFormat );
+void TransportFilter::setWireFormat(const Pointer<wireformat::WireFormat> wireFormat) {
+ next->setWireFormat(wireFormat);
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h Thu Oct 11 22:39:46 2012
@@ -39,8 +39,7 @@ namespace transport{
*
* @since 1.0
*/
- class AMQCPP_API TransportFilter : public Transport,
- public TransportListener {
+ class AMQCPP_API TransportFilter: public Transport, public TransportListener {
protected:
/**
@@ -59,18 +58,18 @@ namespace transport{
* Notify the listener of the thrown Exception.
* @param ex - the exception to send to listeners
*/
- void fire( const decaf::lang::Exception& ex );
+ void fire(const decaf::lang::Exception& ex);
/**
* Notify the listener of the new incoming Command.
* @param command - the command to send to the listener
*/
- void fire( const Pointer<Command>& command );
+ void fire(const Pointer<Command> command);
private:
- TransportFilter( const TransportFilter& );
- TransportFilter& operator= ( const TransportFilter& );
+ TransportFilter(const TransportFilter&);
+ TransportFilter& operator=(const TransportFilter&);
public:
@@ -78,24 +77,25 @@ namespace transport{
* Constructor.
* @param next - the next Transport in the chain
*/
- TransportFilter( const Pointer<Transport>& next );
+ TransportFilter(const Pointer<Transport> next);
virtual ~TransportFilter();
- public: // TransportListener methods
+ public:
+ // TransportListener methods
/**
* Event handler for the receipt of a command.
* @param command - the received command object.
*/
- virtual void onCommand( const Pointer<Command>& command );
+ virtual void onCommand(const Pointer<Command> command);
/**
* Event handler for an exception from a command transport.
* @param ex
* The exception to handle.
*/
- virtual void onException( const decaf::lang::Exception& ex );
+ virtual void onException(const decaf::lang::Exception& ex);
/**
* The transport has suffered an interruption from which it hopes to recover
@@ -107,21 +107,27 @@ namespace transport{
*/
virtual void transportResumed();
- public: // Transport Methods.
+ public:
+ // Transport Methods.
+
+ virtual void oneway(const Pointer<Command> command) {
+ next->oneway(command);
+ }
- virtual void oneway( const Pointer<Command>& command ) {
- next->oneway( command );
+ virtual Pointer<FutureResponse> asyncRequest(const Pointer<Command> command,
+ const Pointer<ResponseCallback> responseCallback) {
+ return next->asyncRequest(command, responseCallback);
}
- virtual Pointer<Response> request( const Pointer<Command>& command ) {
- return next->request( command );
+ virtual Pointer<Response> request(const Pointer<Command> command) {
+ return next->request(command);
}
- virtual Pointer<Response> request( const Pointer<Command>& command, unsigned int timeout ) {
- return next->request( command, timeout );
+ virtual Pointer<Response> request(const Pointer<Command> command, unsigned int timeout) {
+ return next->request(command, timeout);
}
- virtual void setTransportListener( TransportListener* listener ) {
+ virtual void setTransportListener(TransportListener* listener) {
this->listener = listener;
}
@@ -131,7 +137,7 @@ namespace transport{
virtual Pointer<wireformat::WireFormat> getWireFormat() const;
- virtual void setWireFormat( const Pointer<wireformat::WireFormat>& wireFormat );
+ virtual void setWireFormat(const Pointer<wireformat::WireFormat> wireFormat);
virtual void start();
@@ -139,7 +145,7 @@ namespace transport{
virtual void close();
- virtual Transport* narrow( const std::type_info& typeId );
+ virtual Transport* narrow(const std::type_info& typeId);
virtual bool isFaultTolerant() const {
return next->isFaultTolerant();
@@ -165,10 +171,10 @@ namespace transport{
return next->getRemoteAddress();
}
- virtual void reconnect( const decaf::net::URI& uri );
+ virtual void reconnect(const decaf::net::URI& uri);
- virtual void updateURIs( bool rebalance, const decaf::util::List<decaf::net::URI>& uris ) {
- next->updateURIs( rebalance, uris );
+ virtual void updateURIs(bool rebalance, const decaf::util::List<decaf::net::URI>& uris) {
+ next->updateURIs(rebalance, uris);
}
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportListener.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportListener.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportListener.h Thu Oct 11 22:39:46 2012
@@ -35,7 +35,7 @@ namespace transport{
/**
* A listener of asynchronous exceptions from a command transport object.
*/
- class AMQCPP_API TransportListener{
+ class AMQCPP_API TransportListener {
public:
virtual ~TransportListener() {}
@@ -49,7 +49,7 @@ namespace transport{
* @param command
* The received command object.
*/
- virtual void onCommand( const Pointer<Command>& command ) = 0;
+ virtual void onCommand(const Pointer<Command> command) = 0;
/**
* Event handler for an exception from a command transport.
@@ -57,7 +57,7 @@ namespace transport{
* @param ex
* The exception being propagated to this listener to handle.
*/
- virtual void onException( const decaf::lang::Exception& ex ) = 0;
+ virtual void onException(const decaf::lang::Exception& ex) = 0;
/**
* The transport has suffered an interruption from which it hopes to recover
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.cpp?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.cpp Thu Oct 11 22:39:46 2012
@@ -36,34 +36,32 @@ TransportRegistry::TransportRegistry() :
////////////////////////////////////////////////////////////////////////////////
TransportRegistry::~TransportRegistry() {
-
- try{
+ try {
this->unregisterAllFactories();
- } catch(...) {}
+ }
+ AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
-TransportFactory* TransportRegistry::findFactory( const std::string& name ) const {
+TransportFactory* TransportRegistry::findFactory(const std::string& name) const {
- if( !this->registry.containsKey( name ) ) {
- throw NoSuchElementException( __FILE__, __LINE__,
- "No Matching Factory Registered for format := %s", name.c_str() );
+ if (!this->registry.containsKey(name)) {
+ throw NoSuchElementException(__FILE__, __LINE__,
+ "No Matching Factory Registered for format := %s", name.c_str());
}
- return this->registry.get( name );
+ return this->registry.get(name);
}
////////////////////////////////////////////////////////////////////////////////
void TransportRegistry::registerFactory(const std::string& name, TransportFactory* factory) {
if (name == "") {
- throw IllegalArgumentException( __FILE__, __LINE__,
- "TransportFactory name cannot be the empty string" );
+ throw IllegalArgumentException(__FILE__, __LINE__, "TransportFactory name cannot be the empty string");
}
if (factory == NULL) {
- throw NullPointerException( __FILE__, __LINE__,
- "Supplied TransportFactory pointer was NULL" );
+ throw NullPointerException(__FILE__, __LINE__, "Supplied TransportFactory pointer was NULL");
}
this->registry.put(name, factory);
@@ -80,7 +78,7 @@ void TransportRegistry::unregisterFactor
////////////////////////////////////////////////////////////////////////////////
void TransportRegistry::unregisterAllFactories() {
- Pointer< Iterator<TransportFactory*> > iterator(this->registry.values().iterator());
+ Pointer<Iterator<TransportFactory*> > iterator(this->registry.values().iterator());
while (iterator->hasNext()) {
delete iterator->next();
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportRegistry.h Thu Oct 11 22:39:46 2012
@@ -53,10 +53,10 @@ namespace transport {
TransportRegistry();
// Hidden Copy Constructor
- TransportRegistry( const TransportRegistry& registry );
+ TransportRegistry(const TransportRegistry& registry);
// Hidden Assignment operator
- TransportRegistry& operator=( const TransportRegistry& registry );
+ TransportRegistry& operator=(const TransportRegistry& registry);
public:
@@ -74,7 +74,7 @@ namespace transport {
*
* @throws NoSuchElementException if no factory is registered with that name.
*/
- TransportFactory* findFactory( const std::string& name ) const;
+ TransportFactory* findFactory(const std::string& name) const;
/**
* Registers a new TransportFactory with this Registry. If a Factory with the
@@ -90,7 +90,7 @@ namespace transport {
* @throws IllegalArgumentException is name is the empty string.
* @throws NullPointerException if the Factory is Null.
*/
- void registerFactory( const std::string& name, TransportFactory* factory );
+ void registerFactory(const std::string& name, TransportFactory* factory);
/**
* Unregisters the Factory with the given name and deletes that instance of the
@@ -99,7 +99,7 @@ namespace transport {
* @param name
* Name of the Factory to unregister and destroy
*/
- void unregisterFactory( const std::string& name );
+ void unregisterFactory(const std::string& name);
/**
* Removes all Factories and deletes the instances of the Factory objects.
@@ -114,7 +114,8 @@ namespace transport {
*/
std::vector<std::string> getTransportNames() const;
- public: // Static methods
+ public:
+ // Static methods
/**
* Gets the single instance of the TransportRegistry
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp Thu Oct 11 22:39:46 2012
@@ -25,7 +25,7 @@
#include <activemq/commands/Response.h>
#include <activemq/commands/ExceptionResponse.h>
-#include <activemq/transport/correlator/FutureResponse.h>
+#include <activemq/transport/FutureResponse.h>
using namespace std;
using namespace activemq;
@@ -93,8 +93,7 @@ namespace correlator{
public:
- CorrelatorData() : nextCommandId(1), requestMap(), mapMutex(), closed(true), priorError(NULL) {
- }
+ CorrelatorData() : nextCommandId(1), requestMap(), mapMutex(), closed(true), priorError(NULL) {}
};
@@ -102,11 +101,11 @@ namespace correlator{
////////////////////////////////////////////////////////////////////////////////
-ResponseCorrelator::ResponseCorrelator(const Pointer<Transport>& next) : TransportFilter(next), impl(new CorrelatorData) {
+ResponseCorrelator::ResponseCorrelator(Pointer<Transport> next) : TransportFilter(next), impl(new CorrelatorData) {
}
////////////////////////////////////////////////////////////////////////////////
-ResponseCorrelator::~ResponseCorrelator(){
+ResponseCorrelator::~ResponseCorrelator() {
// Close the transport and destroy it.
try {
@@ -118,7 +117,7 @@ ResponseCorrelator::~ResponseCorrelator(
}
////////////////////////////////////////////////////////////////////////////////
-void ResponseCorrelator::oneway(const Pointer<Command>& command) {
+void ResponseCorrelator::oneway(const Pointer<Command> command) {
try {
command->setCommandId(this->impl->nextCommandId.getAndIncrement());
@@ -130,15 +129,68 @@ void ResponseCorrelator::oneway(const Po
next->oneway(command);
}
- AMQ_CATCH_RETHROW( UnsupportedOperationException )
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW(UnsupportedOperationException)
+ AMQ_CATCH_RETHROW(IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+ AMQ_CATCHALL_THROW(IOException)
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<FutureResponse> ResponseCorrelator::asyncRequest(const Pointer<Command> command, const Pointer<ResponseCallback> responseCallback) {
+
+ throw UnsupportedOperationException(__FILE__, __LINE__, "Not yet ready for use.");
+
+ try {
+
+ command->setCommandId(this->impl->nextCommandId.getAndIncrement());
+ command->setResponseRequired(true);
+
+ // Add a future response object to the map indexed by this command id.
+ Pointer<FutureResponse> futureResponse(new FutureResponse(responseCallback));
+ Pointer<Exception> priorError;
+
+ synchronized(&this->impl->mapMutex) {
+ priorError = this->impl->priorError;
+ if (priorError == NULL) {
+ this->impl->requestMap.insert(
+ make_pair((unsigned int) command->getCommandId(), futureResponse));
+ }
+ }
+
+ if (priorError != NULL) {
+ //futureResponse->setResponse(new ExceptionResponse(priorError));
+ throw IOException(__FILE__, __LINE__, this->impl->priorError->getMessage().c_str());
+ }
+
+ // The finalizer will cleanup the map even if an exception is thrown.
+ ResponseFinalizer finalizer(&this->impl->mapMutex, command->getCommandId(), &this->impl->requestMap);
+
+ // Wait to be notified of the response via the futureResponse object.
+ Pointer<commands::Response> response;
+
+ // Send the request.
+ next->oneway(command);
+
+ // Get the response.
+ response = futureResponse->getResponse();
+
+ if (response == NULL) {
+ throw IOException(__FILE__, __LINE__,
+ "No valid response received for command: %s, check broker.", command->toString().c_str());
+ }
+
+ return futureResponse;
+ }
+ AMQ_CATCH_RETHROW(UnsupportedOperationException)
+ AMQ_CATCH_RETHROW(IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+ AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> ResponseCorrelator::request(const Pointer<Command>& command) {
+Pointer<Response> ResponseCorrelator::request(const Pointer<Command> command) {
try {
@@ -180,15 +232,15 @@ Pointer<Response> ResponseCorrelator::re
return response;
}
- AMQ_CATCH_RETHROW( UnsupportedOperationException )
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW(UnsupportedOperationException)
+ AMQ_CATCH_RETHROW(IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+ AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
-Pointer<Response> ResponseCorrelator::request(const Pointer<Command>& command, unsigned int timeout) {
+Pointer<Response> ResponseCorrelator::request(const Pointer<Command> command, unsigned int timeout) {
try {
@@ -230,15 +282,15 @@ Pointer<Response> ResponseCorrelator::re
return response;
}
- AMQ_CATCH_RETHROW( UnsupportedOperationException )
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW(UnsupportedOperationException)
+ AMQ_CATCH_RETHROW(IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT(ActiveMQException, IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+ AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
-void ResponseCorrelator::onCommand(const Pointer<Command>& command) {
+void ResponseCorrelator::onCommand(const Pointer<Command> command) {
// Let's see if the incoming command is a response, if not we just pass it along
// and allow outstanding requests to keep waiting without stalling control commands.
@@ -290,9 +342,9 @@ void ResponseCorrelator::start() {
// Mark it as open.
this->impl->closed = false;
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW(IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+ AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
@@ -308,9 +360,9 @@ void ResponseCorrelator::close() {
this->impl->closed = true;
}
- AMQ_CATCH_RETHROW( IOException )
- AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
- AMQ_CATCHALL_THROW( IOException )
+ AMQ_CATCH_RETHROW(IOException)
+ AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+ AMQ_CATCHALL_THROW(IOException)
}
////////////////////////////////////////////////////////////////////////////////
@@ -320,7 +372,7 @@ void ResponseCorrelator::onException(con
}
////////////////////////////////////////////////////////////////////////////////
-void ResponseCorrelator::dispose(const Pointer<Exception> error) {
+void ResponseCorrelator::dispose(Pointer<Exception> error) {
ArrayList<Pointer<FutureResponse> > requests;
synchronized(&this->impl->mapMutex){
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h Thu Oct 11 22:39:46 2012
@@ -20,15 +20,17 @@
#include <activemq/util/Config.h>
#include <activemq/transport/TransportFilter.h>
+#include <activemq/transport/ResponseCallback.h>
+#include <activemq/transport/FutureResponse.h>
#include <activemq/commands/Command.h>
#include <activemq/commands/Response.h>
#include <decaf/lang/Exception.h>
#include <decaf/lang/Pointer.h>
-namespace activemq{
-namespace transport{
-namespace correlator{
+namespace activemq {
+namespace transport {
+namespace correlator {
using decaf::lang::Pointer;
using activemq::commands::Command;
@@ -56,17 +58,20 @@ namespace correlator{
*
* @throws NullPointerException if next if NULL.
*/
- ResponseCorrelator(const Pointer<Transport>& next);
+ ResponseCorrelator(Pointer<Transport> next);
virtual ~ResponseCorrelator();
public: // Transport Methods
- virtual void oneway(const Pointer<Command>& command);
+ virtual void oneway(const Pointer<Command> command);
- virtual Pointer<Response> request(const Pointer<Command>& command);
+ virtual Pointer<FutureResponse> asyncRequest(const Pointer<Command> command,
+ const Pointer<ResponseCallback> responseCallback);
- virtual Pointer<Response> request(const Pointer<Command>& command, unsigned int timeout);
+ virtual Pointer<Response> request(const Pointer<Command> command);
+
+ virtual Pointer<Response> request(const Pointer<Command> command, unsigned int timeout);
virtual void start();
@@ -81,7 +86,7 @@ namespace correlator{
* @param command
* The received from the nested transport.
*/
- virtual void onCommand(const Pointer<Command>& command);
+ virtual void onCommand(const Pointer<Command> command);
/**
* Event handler for an exception from a command transport.
@@ -95,7 +100,7 @@ namespace correlator{
private:
- void dispose(const Pointer<decaf::lang::Exception> ex);
+ void dispose(Pointer<decaf::lang::Exception> ex);
};
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransport.cpp?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransport.cpp Thu Oct 11 22:39:46 2012
@@ -24,9 +24,8 @@ using namespace activemq::transport;
using namespace activemq::transport::failover;
////////////////////////////////////////////////////////////////////////////////
-BackupTransport::BackupTransport( BackupTransportPool* parent ) :
- parent( parent ), transport(), uri(), closed( true ) {
-
+BackupTransport::BackupTransport(BackupTransportPool* parent) :
+ parent(parent), transport(), uri(), closed(true) {
}
////////////////////////////////////////////////////////////////////////////////
@@ -34,11 +33,11 @@ BackupTransport::~BackupTransport() {
}
////////////////////////////////////////////////////////////////////////////////
-void BackupTransport::onException( const decaf::lang::Exception& ex AMQCPP_UNUSED ) {
+void BackupTransport::onException(const decaf::lang::Exception& ex AMQCPP_UNUSED) {
this->closed = true;
- if( this->parent != NULL ) {
- this->parent->onBackupTransportFailure( this );
+ if (this->parent != NULL) {
+ this->parent->onBackupTransportFailure(this);
}
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransport.h?rev=1397341&r1=1397340&r2=1397341&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/BackupTransport.h Thu Oct 11 22:39:46 2012
@@ -34,7 +34,7 @@ namespace failover {
class BackupTransportPool;
- class AMQCPP_API BackupTransport : public DefaultTransportListener {
+ class AMQCPP_API BackupTransport: public DefaultTransportListener {
private:
// The parent of this Backup
@@ -51,12 +51,12 @@ namespace failover {
private:
- BackupTransport( const BackupTransport& );
- BackupTransport& operator= ( const BackupTransport& );
+ BackupTransport(const BackupTransport&);
+ BackupTransport& operator=(const BackupTransport&);
public:
- BackupTransport( BackupTransportPool* failover );
+ BackupTransport(BackupTransportPool* failover);
virtual ~BackupTransport();
@@ -71,7 +71,7 @@ namespace failover {
/**
* Sets the URI assigned to this Transport.
*/
- void setUri( const decaf::net::URI& uri ) {
+ void setUri(const decaf::net::URI& uri) {
this->uri = uri;
}
@@ -90,11 +90,11 @@ namespace failover {
* @param transport
* The transport to hold.
*/
- void setTransport( const Pointer<Transport>& transport ) {
+ void setTransport(const Pointer<Transport> transport) {
this->transport = transport;
- if( this->transport != NULL ) {
- this->transport->setTransportListener( this );
+ if (this->transport != NULL) {
+ this->transport->setTransportListener(this);
}
}
@@ -107,7 +107,7 @@ namespace failover {
* @param ex
* The exception that was passed to this listener to handle.
*/
- virtual void onException( const decaf::lang::Exception& ex );
+ virtual void onException(const decaf::lang::Exception& ex);
/**
* Has the Transport been shutdown and no longer usable.
@@ -122,7 +122,7 @@ namespace failover {
* Sets the closed flag on this Transport.
* @param value - true for closed.
*/
- void setClosed( bool value ) {
+ void setClosed(bool value) {
this->closed = value;
}