You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/01 15:36:32 UTC
svn commit: r1392319 [1/2] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/cpp/ hedwig-client/src/main/cpp/inc/hedwig/
hedwig-client/src/main/cpp/lib/ hedwig-client/src/main/cpp/scripts/
hedwig-client/src/main/cpp/test/
Author: ivank
Date: Mon Oct 1 13:36:31 2012
New Revision: 1392319
URL: http://svn.apache.org/viewvc?rev=1392319&view=rev
Log:
BOOKKEEPER-143: Add SSL support for hedwig cpp client (sijie via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/Makefile.am
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/configure.ac
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/pubsubtest.cpp
zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/util.h
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Oct 1 13:36:31 2012
@@ -176,6 +176,8 @@ Trunk (unreleased changes)
BOOKKEEPER-364: re-factor hedwig java client to support both one-subscription-per-channel and multiplex-subscriptions-per-channel. (sijie via ivank)
+ BOOKKEEPER-143: Add SSL support for hedwig cpp client (sijie via ivank)
+
Release 4.1.0 - 2012-06-07
Non-backward compatible changes:
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/Makefile.am?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/Makefile.am (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/Makefile.am Mon Oct 1 13:36:31 2012
@@ -27,3 +27,12 @@ pkgconfigdir = $(libdir)/pkgconfig
nodist_pkgconfig_DATA = hedwig-0.1.pc
EXTRA_DIST = $(DX_CONFIG) doc/html
+
+check:
+ cd test; make check
+
+sslcheck:
+ cd test; make sslcheck
+
+simplecheck:
+ cd test; make simplecheck
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/configure.ac
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/configure.ac?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/configure.ac (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/configure.ac Mon Oct 1 13:36:31 2012
@@ -26,7 +26,7 @@ AC_LANG([C++])
AC_CONFIG_FILES([Makefile lib/Makefile test/Makefile hedwig-0.1.pc])
AC_PROG_LIBTOOL
AC_CONFIG_MACRO_DIR([m4])
-PKG_CHECK_MODULES([DEPS], [liblog4cxx protobuf])
+PKG_CHECK_MODULES([DEPS], [liblog4cxx protobuf openssl])
GTEST_LIB_CHECK([1.5.0], [AC_MSG_RESULT([GoogleTest found, Tests Enabled])],
[AC_MSG_WARN([GoogleTest not found, Tests disabled])])
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/inc/hedwig/client.h Mon Oct 1 13:36:31 2012
@@ -47,6 +47,8 @@ namespace Hedwig {
static const std::string SYNC_REQUEST_TIMEOUT;
static const std::string SUBSCRIBER_AUTOCONSUME;
static const std::string NUM_DISPATCH_THREADS;
+ static const std::string RUN_AS_SSL_MODE;
+ static const std::string SSL_PEM_FILE;
/**
* The maximum number of messages the hub will queue for subscriptions
* created using this configuration. The hub will always queue the most
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.cpp Mon Oct 1 13:36:31 2012
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
#ifdef HAVE_CONFIG_H
#include <config.h>
@@ -48,63 +47,109 @@ static log4cxx::LoggerPtr logger(log4cxx
using namespace Hedwig;
-DuplexChannel::DuplexChannel(EventDispatcher& dispatcher, const HostAddress& addr,
- const Configuration& cfg, const ChannelHandlerPtr& handler)
- : dispatcher(dispatcher), address(addr), handler(handler), service(dispatcher.getService()),
- socket(service), instream(&in_buf), copy_buf(NULL), copy_buf_length(0),
- state(UNINITIALISED), receiving(false), reading(false), sending(false)
-{
- LOG4CXX_DEBUG(logger, "Creating DuplexChannel(" << this << ")");
+const bool DEFAULT_SSL_ENABLED = false;
+const std::string DEFAULT_SSL_PEM_FILE = "";
+
+AbstractDuplexChannel::AbstractDuplexChannel(IOServicePtr& service,
+ const HostAddress& addr,
+ const ChannelHandlerPtr& handler)
+ : address(addr), handler(handler), service(service->getService()),
+ instream(&in_buf), copy_buf(NULL), copy_buf_length(0),
+ state(UNINITIALISED), receiving(false), reading(false), sending(false),
+ closed(false)
+{}
+
+AbstractDuplexChannel::~AbstractDuplexChannel() {
+ free(copy_buf);
+ copy_buf = NULL;
+ copy_buf_length = 0;
+
+ LOG4CXX_INFO(logger, "Destroying DuplexChannel(" << this << ")");
}
-/*static*/ void DuplexChannel::connectCallbackHandler(DuplexChannelPtr channel,
- const boost::system::error_code& error) {
- LOG4CXX_DEBUG(logger,"DuplexChannel::connectCallbackHandler error(" << error
- << ") channel(" << channel.get() << ")");
+/*static*/ void AbstractDuplexChannel::connectCallbackHandler(
+ AbstractDuplexChannelPtr channel,
+ OperationCallbackPtr callback,
+ const boost::system::error_code& error) {
+ channel->doAfterConnect(callback, error);
+}
+void AbstractDuplexChannel::connect() {
+ connect(OperationCallbackPtr());
+}
+
+void AbstractDuplexChannel::connect(const OperationCallbackPtr& callback) {
+ setState(CONNECTING);
+ doConnect(callback);
+}
+
+void AbstractDuplexChannel::doAfterConnect(const OperationCallbackPtr& callback,
+ const boost::system::error_code& error) {
if (error) {
- channel->channelDisconnected(ChannelConnectException());
- channel->setState(DEAD);
+ LOG4CXX_ERROR(logger, "Channel " << this << " connect error : " << error.message().c_str());
+ channelConnectFailed(ChannelConnectException(), callback);
return;
}
- channel->setState(CONNECTED);
-
+ // set no delay option
boost::system::error_code ec;
- boost::asio::ip::tcp::no_delay option(true);
-
- channel->socket.set_option(option, ec);
+ setSocketOption(ec);
if (ec) {
- channel->channelDisconnected(ChannelSetupException());
- channel->setState(DEAD);
+ LOG4CXX_WARN(logger, "Channel " << this << " set up socket error : " << ec.message().c_str());
+ channelConnectFailed(ChannelSetupException(), callback);
return;
}
-
- channel->startSending();
- channel->startReceiving();
-}
-void DuplexChannel::connect() {
- setState(CONNECTING);
+ boost::asio::ip::tcp::endpoint localEp;
+ boost::asio::ip::tcp::endpoint remoteEp;
+ localEp = getLocalAddress(ec);
+ remoteEp = getRemoteAddress(ec);
+
+ if (!ec) {
+ LOG4CXX_INFO(logger, "Channel " << this << " connected :"
+ << localEp.address().to_string() << ":" << localEp.port() << "=>"
+ << remoteEp.address().to_string() << ":" << remoteEp.port());
+ // update ip address since if might connect to VIP
+ address.updateIP(remoteEp.address().to_v4().to_ulong());
+ }
+ // the channel is connected
+ channelConnected(callback);
+}
+
+void AbstractDuplexChannel::channelConnectFailed(const std::exception& e,
+ const OperationCallbackPtr& callback) {
+ channelDisconnected(e);
+ setState(DEAD);
+ if (callback.get()) {
+ callback->operationFailed(e);
+ }
+}
- boost::asio::ip::tcp::endpoint endp(boost::asio::ip::address_v4(address.ip()), address.port());
- boost::system::error_code error = boost::asio::error::host_not_found;
+void AbstractDuplexChannel::channelConnected(const OperationCallbackPtr& callback) {
+ // for normal channel, we have done here
+ setState(CONNECTED);
+ if (callback.get()) {
+ callback->operationComplete();
+ }
- socket.async_connect(endp, boost::bind(&DuplexChannel::connectCallbackHandler,
- shared_from_this(),
- boost::asio::placeholders::error));
+ // enable sending & receiving
+ startSending();
+ startReceiving();
}
-/*static*/ void DuplexChannel::messageReadCallbackHandler(DuplexChannelPtr channel,
- std::size_t message_size,
- const boost::system::error_code& error,
- std::size_t bytes_transferred) {
+/*static*/ void AbstractDuplexChannel::messageReadCallbackHandler(
+ AbstractDuplexChannelPtr channel,
+ std::size_t message_size,
+ const boost::system::error_code& error,
+ std::size_t bytes_transferred) {
LOG4CXX_DEBUG(logger, "DuplexChannel::messageReadCallbackHandler " << error << ", "
- << bytes_transferred << " channel(" << channel.get() << ")");
-
+ << bytes_transferred << " channel(" << channel.get() << ")");
+
if (error) {
- LOG4CXX_ERROR(logger, "Invalid read error (" << error << ") bytes_transferred ("
- << bytes_transferred << ") channel(" << channel.get() << ")");
+ if (!channel->isClosed()) {
+ LOG4CXX_INFO(logger, "Invalid read error (" << error << ") bytes_transferred ("
+ << bytes_transferred << ") channel(" << channel.get() << ")");
+ }
channel->channelDisconnected(ChannelReadException());
return;
}
@@ -114,6 +159,10 @@ void DuplexChannel::connect() {
channel->copy_buf = (char*)realloc(channel->copy_buf, channel->copy_buf_length);
if (channel->copy_buf == NULL) {
LOG4CXX_ERROR(logger, "Error allocating buffer. channel(" << channel.get() << ")");
+ // if failed to realloc memory, we should disconnect the channel.
+ // then it would enter disconnect logic, which would close channel and release
+ // its resources includes the copy_buf memory.
+ channel->channelDisconnected(ChannelOutOfMemoryException());
return;
}
}
@@ -122,15 +171,13 @@ void DuplexChannel::connect() {
PubSubResponsePtr response(new PubSubResponse());
bool err = response->ParseFromArray(channel->copy_buf, message_size);
-
if (!err) {
LOG4CXX_ERROR(logger, "Error parsing message. channel(" << channel.get() << ")");
-
channel->channelDisconnected(ChannelReadException());
return;
} else {
LOG4CXX_DEBUG(logger, "channel(" << channel.get() << ") : " << channel->in_buf.size()
- << " bytes left in buffer");
+ << " bytes left in buffer");
}
ChannelHandlerPtr h;
@@ -159,25 +206,28 @@ void DuplexChannel::connect() {
h->messageReceived(channel, response);
}
- DuplexChannel::readSize(channel);
+ AbstractDuplexChannel::readSize(channel);
}
-/*static*/ void DuplexChannel::sizeReadCallbackHandler(DuplexChannelPtr channel,
- const boost::system::error_code& error,
- std::size_t bytes_transferred) {
+/*static*/ void AbstractDuplexChannel::sizeReadCallbackHandler(
+ AbstractDuplexChannelPtr channel,
+ const boost::system::error_code& error,
+ std::size_t bytes_transferred) {
LOG4CXX_DEBUG(logger, "DuplexChannel::sizeReadCallbackHandler " << error << ", "
- << bytes_transferred << " channel(" << channel.get() << ")");
+ << bytes_transferred << " channel(" << channel.get() << ")");
if (error) {
- LOG4CXX_ERROR(logger, "Invalid read error (" << error << ") bytes_transferred ("
- << bytes_transferred << ") channel(" << channel.get() << ")");
+ if (!channel->isClosed()) {
+ LOG4CXX_INFO(logger, "Invalid read error (" << error << ") bytes_transferred ("
+ << bytes_transferred << ") channel(" << channel.get() << ")");
+ }
channel->channelDisconnected(ChannelReadException());
return;
}
if (channel->in_buf.size() < sizeof(uint32_t)) {
LOG4CXX_ERROR(logger, "Not enough data in stream. Must have been an error reading. "
- << " Closing channel(" << channel.get() << ")");
+ << " Closing channel(" << channel.get() << ")");
channel->channelDisconnected(ChannelReadException());
return;
}
@@ -189,40 +239,30 @@ void DuplexChannel::connect() {
int toread = size - channel->in_buf.size();
LOG4CXX_DEBUG(logger, " size of incoming message " << size << ", currently in buffer "
- << channel->in_buf.size() << " channel(" << channel.get() << ")");
+ << channel->in_buf.size() << " channel(" << channel.get() << ")");
if (toread <= 0) {
- DuplexChannel::messageReadCallbackHandler(channel, size, error, 0);
+ AbstractDuplexChannel::messageReadCallbackHandler(channel, size, error, 0);
} else {
- boost::asio::async_read(channel->socket, channel->in_buf,
- boost::asio::transfer_at_least(toread),
- boost::bind(&DuplexChannel::messageReadCallbackHandler,
- channel, size,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ channel->readMsgBody(channel->in_buf, toread, size);
}
}
-/*static*/ void DuplexChannel::readSize(DuplexChannelPtr channel) {
+/*static*/ void AbstractDuplexChannel::readSize(AbstractDuplexChannelPtr channel) {
int toread = sizeof(uint32_t) - channel->in_buf.size();
LOG4CXX_DEBUG(logger, " size of incoming message " << sizeof(uint32_t)
- << ", currently in buffer " << channel->in_buf.size()
- << " channel(" << channel.get() << ")");
+ << ", currently in buffer " << channel->in_buf.size()
+ << " channel(" << channel.get() << ")");
if (toread < 0) {
- DuplexChannel::sizeReadCallbackHandler(channel, boost::system::error_code(), 0);
+ AbstractDuplexChannel::sizeReadCallbackHandler(channel, boost::system::error_code(), 0);
} else {
- // in_buf_size.prepare(sizeof(uint32_t));
- boost::asio::async_read(channel->socket, channel->in_buf,
- boost::asio::transfer_at_least(sizeof(uint32_t)),
- boost::bind(&DuplexChannel::sizeReadCallbackHandler,
- channel,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ channel->readMsgSize(channel->in_buf);
}
}
-void DuplexChannel::startReceiving() {
- LOG4CXX_DEBUG(logger, "DuplexChannel::startReceiving channel(" << this << ") currently receiving = " << receiving);
+void AbstractDuplexChannel::startReceiving() {
+ LOG4CXX_DEBUG(logger, "DuplexChannel::startReceiving channel(" << this
+ << ") currently receiving = " << receiving);
PubSubResponsePtr response;
bool inReadingState;
@@ -271,22 +311,26 @@ void DuplexChannel::startReceiving() {
// if channel is not in reading state, #readSize
if (!inReadingState) {
- DuplexChannel::readSize(shared_from_this());
+ AbstractDuplexChannel::readSize(shared_from_this());
}
}
-bool DuplexChannel::isReceiving() {
+bool AbstractDuplexChannel::isReceiving() {
return receiving;
}
-void DuplexChannel::stopReceiving() {
+bool AbstractDuplexChannel::isClosed() {
+ return closed;
+}
+
+void AbstractDuplexChannel::stopReceiving() {
LOG4CXX_DEBUG(logger, "DuplexChannel::stopReceiving channel(" << this << ")");
boost::lock_guard<boost::mutex> lock(receiving_lock);
receiving = false;
}
-void DuplexChannel::startSending() {
+void AbstractDuplexChannel::startSending() {
{
boost::shared_lock<boost::shared_mutex> lock(state_lock);
if (state != CONNECTED) {
@@ -298,7 +342,7 @@ void DuplexChannel::startSending() {
if (sending) {
return;
}
- LOG4CXX_DEBUG(logger, "DuplexChannel::startSending channel(" << this << ")");
+ LOG4CXX_DEBUG(logger, "AbstractDuplexChannel::startSending channel(" << this << ")");
WriteRequest w;
{
@@ -323,20 +367,14 @@ void DuplexChannel::startSending() {
return;
}
- boost::asio::async_write(socket, out_buf,
- boost::bind(&DuplexChannel::writeCallbackHandler,
- shared_from_this(),
- w.second,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ writeBuffer(out_buf, w.second);
}
-
-const HostAddress& DuplexChannel::getHostAddress() const {
+const HostAddress& AbstractDuplexChannel::getHostAddress() const {
return address;
}
-void DuplexChannel::channelDisconnected(const std::exception& e) {
+void AbstractDuplexChannel::channelDisconnected(const std::exception& e) {
setState(DEAD);
{
@@ -360,54 +398,39 @@ void DuplexChannel::channelDisconnected(
}
}
-void DuplexChannel::kill() {
- LOG4CXX_DEBUG(logger, "Killing duplex channel (" << this << ")");
-
- bool connected = false;
+void AbstractDuplexChannel::close() {
{
boost::shared_lock<boost::shared_mutex> statelock(state_lock);
- connected = (state == CONNECTING || state == CONNECTED);
+ state = DEAD;
}
- boost::lock_guard<boost::shared_mutex> lock(destruction_lock);
- if (connected) {
- setState(DEAD);
-
- boost::system::error_code ec;
- socket.cancel(ec);
- if (ec) {
- LOG4CXX_WARN(logger, "Channel " << this << " canceling io error : " << ec.message().c_str());
- }
- socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
- if (ec) {
- LOG4CXX_WARN(logger, "Channel " << this << " shutdown error : " << ec.message().c_str());
- }
- socket.close(ec);
- if (ec) {
- LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
+ {
+ boost::lock_guard<boost::shared_mutex> lock(destruction_lock);
+ if (closed) {
+ // some one has closed the socket.
+ return;
}
+ closed = true;
+ handler = ChannelHandlerPtr(); // clear the handler in case it ever referenced the channel*/
}
- handler = ChannelHandlerPtr(); // clear the handler in case it ever referenced the channel*/
-}
-DuplexChannel::~DuplexChannel() {
- /** If we are going away, fail all transactions that haven't been completed */
- failAllTransactions();
- kill();
- free(copy_buf);
- copy_buf = NULL;
- copy_buf_length = 0;
+ LOG4CXX_INFO(logger, "Killing duplex channel (" << this << ")");
- LOG4CXX_DEBUG(logger, "Destroying DuplexChannel(" << this << ")");
+ // If we are going away, fail all transactions that haven't been completed
+ failAllTransactions();
+ closeSocket();
}
-/*static*/ void DuplexChannel::writeCallbackHandler(DuplexChannelPtr channel, OperationCallbackPtr callback,
- const boost::system::error_code& error,
- std::size_t bytes_transferred) {
- LOG4CXX_DEBUG(logger, "DuplexChannel::writeCallbackHandler " << error << ", "
- << bytes_transferred << " channel(" << channel.get() << ")");
-
+/*static*/ void AbstractDuplexChannel::writeCallbackHandler(
+ AbstractDuplexChannelPtr channel,
+ OperationCallbackPtr callback,
+ const boost::system::error_code& error,
+ std::size_t bytes_transferred) {
if (error) {
+ if (!channel->isClosed()) {
+ LOG4CXX_DEBUG(logger, "AbstractDuplexChannel::writeCallbackHandler " << error << ", "
+ << bytes_transferred << " channel(" << channel.get() << ")");
+ }
callback->operationFailed(ChannelWriteException());
channel->channelDisconnected(ChannelWriteException());
return;
@@ -425,17 +448,15 @@ DuplexChannel::~DuplexChannel() {
channel->startSending();
}
-void DuplexChannel::writeRequest(const PubSubRequestPtr& m, const OperationCallbackPtr& callback) {
- LOG4CXX_DEBUG(logger, "DuplexChannel::writeRequest channel(" << this << ") txnid("
- << m->txnid() << ") shouldClaim("<< m->has_shouldclaim() << ", "
- << m->shouldclaim() << ")");
-
+void AbstractDuplexChannel::writeRequest(const PubSubRequestPtr& m,
+ const OperationCallbackPtr& callback) {
{
boost::shared_lock<boost::shared_mutex> lock(state_lock);
if (state != CONNECTED && state != CONNECTING) {
LOG4CXX_ERROR(logger,"Tried to write transaction [" << m->txnid() << "] to a channel ["
- << this << "] which is " << (state == DEAD ? "DEAD" : "UNINITIALISED"));
+ << this << "] which is " << (state == DEAD ? "DEAD" : "UNINITIALISED"));
callback->operationFailed(UninitialisedChannelException());
+ return;
}
}
@@ -448,10 +469,14 @@ void DuplexChannel::writeRequest(const P
startSending();
}
+//
+// Transaction operations
+//
+
/**
Store the transaction data for a request.
*/
-void DuplexChannel::storeTransaction(const PubSubDataPtr& data) {
+void AbstractDuplexChannel::storeTransaction(const PubSubDataPtr& data) {
LOG4CXX_DEBUG(logger, "Storing txnid(" << data->getTxnId() << ") for channel(" << this << ")");
boost::lock_guard<boost::mutex> lock(txnid2data_lock);
@@ -461,20 +486,20 @@ void DuplexChannel::storeTransaction(con
/**
Give the transaction back to the caller.
*/
-PubSubDataPtr DuplexChannel::retrieveTransaction(long txnid) {
+PubSubDataPtr AbstractDuplexChannel::retrieveTransaction(long txnid) {
boost::lock_guard<boost::mutex> lock(txnid2data_lock);
PubSubDataPtr data = txnid2data[txnid];
txnid2data.erase(txnid);
if (data == NULL) {
LOG4CXX_ERROR(logger, "Transaction txnid(" << txnid
- << ") doesn't exist in channel (" << this << ")");
+ << ") doesn't exist in channel (" << this << ")");
}
return data;
}
-void DuplexChannel::failAllTransactions() {
+void AbstractDuplexChannel::failAllTransactions() {
boost::lock_guard<boost::mutex> lock(txnid2data_lock);
for (TransactionMap::iterator iter = txnid2data.begin(); iter != txnid2data.end(); ++iter) {
PubSubDataPtr& data = (*iter).second;
@@ -483,7 +508,322 @@ void DuplexChannel::failAllTransactions(
txnid2data.clear();
}
-void DuplexChannel::setState(State s) {
+// Set state for the channel
+void AbstractDuplexChannel::setState(State s) {
boost::lock_guard<boost::shared_mutex> lock(state_lock);
state = s;
}
+
+//
+// Basic Asio Channel Implementation
+//
+
+AsioDuplexChannel::AsioDuplexChannel(IOServicePtr& service,
+ const HostAddress& addr,
+ const ChannelHandlerPtr& handler)
+ : AbstractDuplexChannel(service, addr, handler) {
+ this->socket = boost_socket_ptr(new boost_socket(getService()));
+ LOG4CXX_DEBUG(logger, "Creating DuplexChannel(" << this << ")");
+}
+
+AsioDuplexChannel::~AsioDuplexChannel() {
+}
+
+void AsioDuplexChannel::doConnect(const OperationCallbackPtr& callback) {
+ boost::system::error_code error = boost::asio::error::host_not_found;
+ uint32_t ip2conn = address.ip();
+ uint16_t port2conn = address.port();
+ boost::asio::ip::tcp::endpoint endp(boost::asio::ip::address_v4(ip2conn), port2conn);
+
+ socket->async_connect(endp, boost::bind(&AbstractDuplexChannel::connectCallbackHandler,
+ shared_from_this(), callback,
+ boost::asio::placeholders::error));
+ LOG4CXX_INFO(logger, "Channel (" << this << ") fire connect operation to ip ("
+ << ip2conn << ") port (" << port2conn << ")");
+}
+
+void AsioDuplexChannel::setSocketOption(boost::system::error_code& ec) {
+ boost::asio::ip::tcp::no_delay option(true);
+ socket->set_option(option, ec);
+}
+
+boost::asio::ip::tcp::endpoint AsioDuplexChannel::getLocalAddress(
+ boost::system::error_code& ec) {
+ return socket->local_endpoint(ec);
+}
+
+boost::asio::ip::tcp::endpoint AsioDuplexChannel::getRemoteAddress(
+ boost::system::error_code& ec) {
+ return socket->remote_endpoint(ec);
+}
+
+void AsioDuplexChannel::writeBuffer(boost::asio::streambuf& buffer,
+ const OperationCallbackPtr& callback) {
+ boost::asio::async_write(*socket, buffer,
+ boost::bind(&AbstractDuplexChannel::writeCallbackHandler,
+ shared_from_this(), callback,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred)
+ );
+}
+
+void AsioDuplexChannel::readMsgSize(boost::asio::streambuf& buffer) {
+ boost::asio::async_read(*socket, buffer, boost::asio::transfer_at_least(sizeof(uint32_t)),
+ boost::bind(&AbstractDuplexChannel::sizeReadCallbackHandler,
+ shared_from_this(),
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+}
+
+void AsioDuplexChannel::readMsgBody(boost::asio::streambuf& buffer,
+ int toread, uint32_t msgSize) {
+ boost::asio::async_read(*socket, buffer, boost::asio::transfer_at_least(toread),
+ boost::bind(&AbstractDuplexChannel::messageReadCallbackHandler,
+ shared_from_this(), msgSize,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+}
+
+void AsioDuplexChannel::closeSocket() {
+ boost::system::error_code ec;
+
+ socket->cancel(ec);
+ if (ec) {
+ LOG4CXX_WARN(logger, "Channel " << this << " canceling io error : " << ec.message().c_str());
+ }
+
+ socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
+ if (ec) {
+ LOG4CXX_WARN(logger, "Channel " << this << " shutdown error : " << ec.message().c_str());
+ }
+
+ socket->close(ec);
+ if (ec) {
+ LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
+ }
+}
+
+// SSL Context Factory
+
+SSLContextFactory::SSLContextFactory(const Configuration& conf)
+ : conf(conf),
+ sslPemFile(conf.get(Configuration::SSL_PEM_FILE,
+ DEFAULT_SSL_PEM_FILE)) {
+}
+
+SSLContextFactory::~SSLContextFactory() {}
+
+boost_ssl_context_ptr SSLContextFactory::createSSLContext(boost::asio::io_service& service) {
+ boost_ssl_context_ptr sslCtx(new boost_ssl_context(service,
+ boost::asio::ssl::context::sslv23_client));
+ sslCtx->set_verify_mode(boost::asio::ssl::context::verify_none);
+ if (!sslPemFile.empty()) {
+ boost::system::error_code err;
+ sslCtx->load_verify_file(sslPemFile, err);
+
+ if (err) {
+ LOG4CXX_ERROR(logger, "Failed to load verify ssl pem file : "
+ << sslPemFile);
+ throw InvalidSSLPermFileException();
+ }
+ }
+ return sslCtx;
+}
+
+//
+// SSL Channl Implementation
+//
+
+#ifndef __APPLE__
+AsioSSLDuplexChannel::AsioSSLDuplexChannel(IOServicePtr& service,
+ const boost_ssl_context_ptr& sslCtx,
+ const HostAddress& addr,
+ const ChannelHandlerPtr& handler)
+ : AbstractDuplexChannel(service, addr, handler), ssl_ctx(sslCtx),
+ sslclosed(false) {
+#else
+AsioSSLDuplexChannel::AsioSSLDuplexChannel(IOServicePtr& service,
+ const boost_ssl_context_ptr& sslCtx,
+ const HostAddress& addr,
+ const ChannelHandlerPtr& handler)
+ : AbstractDuplexChannel(service, addr, handler), ssl_ctx(sslCtx) {
+#endif
+ ssl_socket = boost_ssl_socket_ptr(new boost_ssl_socket(getService(), *ssl_ctx));
+ LOG4CXX_DEBUG(logger, "Created SSL DuplexChannel(" << this << ")");
+}
+
+AsioSSLDuplexChannel::~AsioSSLDuplexChannel() {
+}
+
+void AsioSSLDuplexChannel::doConnect(const OperationCallbackPtr& callback) {
+ boost::system::error_code error = boost::asio::error::host_not_found;
+ uint32_t ip2conn = address.ip();
+ uint16_t port2conn = address.sslPort();
+ boost::asio::ip::tcp::endpoint endp(boost::asio::ip::address_v4(ip2conn), port2conn);
+
+ ssl_socket->lowest_layer().async_connect(endp,
+ boost::bind(&AbstractDuplexChannel::connectCallbackHandler,
+ shared_from_this(), callback,
+ boost::asio::placeholders::error));
+ LOG4CXX_INFO(logger, "SSL Channel (" << this << ") fire connect operation to ip ("
+ << ip2conn << ") port (" << port2conn << ")");
+}
+
+void AsioSSLDuplexChannel::setSocketOption(boost::system::error_code& ec) {
+ boost::asio::ip::tcp::no_delay option(true);
+ ssl_socket->lowest_layer().set_option(option, ec);
+}
+
+boost::asio::ip::tcp::endpoint AsioSSLDuplexChannel::getLocalAddress(
+ boost::system::error_code& ec) {
+ return ssl_socket->lowest_layer().local_endpoint(ec);
+}
+
+boost::asio::ip::tcp::endpoint AsioSSLDuplexChannel::getRemoteAddress(
+ boost::system::error_code& ec) {
+ return ssl_socket->lowest_layer().remote_endpoint(ec);
+}
+
+void AsioSSLDuplexChannel::channelConnected(const OperationCallbackPtr& callback) {
+ // for SSL channel, we had to do SSL hand shake
+ startHandShake(callback);
+ LOG4CXX_INFO(logger, "SSL Channel " << this << " fire hand shake operation");
+}
+
+void AsioSSLDuplexChannel::sslChannelConnected(const OperationCallbackPtr& callback) {
+ LOG4CXX_INFO(logger, "SSL Channel " << this << " hand shake finish!!");
+ AbstractDuplexChannel::channelConnected(callback);
+}
+
+void AsioSSLDuplexChannel::startHandShake(const OperationCallbackPtr& callback) {
+ ssl_socket->async_handshake(boost::asio::ssl::stream_base::client,
+ boost::bind(&AsioSSLDuplexChannel::handleHandshake,
+ boost::shared_dynamic_cast<AsioSSLDuplexChannel>(shared_from_this()),
+ callback, boost::asio::placeholders::error));
+}
+
+void AsioSSLDuplexChannel::handleHandshake(AsioSSLDuplexChannelPtr channel,
+ OperationCallbackPtr callback,
+ const boost::system::error_code& error) {
+ if (error) {
+ LOG4CXX_ERROR(logger, "SSL Channel " << channel.get() << " hand shake error : "
+ << error.message().c_str());
+ channel->channelConnectFailed(ChannelConnectException(), callback);
+ return;
+ }
+ channel->sslChannelConnected(callback);
+}
+
+void AsioSSLDuplexChannel::writeBuffer(boost::asio::streambuf& buffer,
+ const OperationCallbackPtr& callback) {
+ boost::asio::async_write(*ssl_socket, buffer,
+ boost::bind(&AbstractDuplexChannel::writeCallbackHandler,
+ shared_from_this(), callback,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred)
+ );
+}
+
+void AsioSSLDuplexChannel::readMsgSize(boost::asio::streambuf& buffer) {
+ boost::asio::async_read(*ssl_socket, buffer, boost::asio::transfer_at_least(sizeof(uint32_t)),
+ boost::bind(&AbstractDuplexChannel::sizeReadCallbackHandler,
+ shared_from_this(),
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+}
+
+void AsioSSLDuplexChannel::readMsgBody(boost::asio::streambuf& buffer,
+ int toread, uint32_t msgSize) {
+ boost::asio::async_read(*ssl_socket, buffer, boost::asio::transfer_at_least(toread),
+ boost::bind(&AbstractDuplexChannel::messageReadCallbackHandler,
+ shared_from_this(), msgSize,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+}
+
+#ifndef __APPLE__
+// boost asio doesn't provide time out mechanism to shutdown ssl
+void AsioSSLDuplexChannel::sslShutdown() {
+ ssl_socket->async_shutdown(boost::bind(&AsioSSLDuplexChannel::handleSSLShutdown,
+ boost::shared_dynamic_cast<AsioSSLDuplexChannel>(shared_from_this()),
+ boost::asio::placeholders::error));
+}
+
+void AsioSSLDuplexChannel::handleSSLShutdown(const boost::system::error_code& error) {
+ if (error) {
+ LOG4CXX_ERROR(logger, "SSL Channel " << this << " shutdown error : "
+ << error.message().c_str());
+ }
+ {
+ boost::lock_guard<boost::mutex> lock(sslclosed_lock);
+ sslclosed = true;
+ }
+ sslclosed_cond.notify_all();
+}
+#endif
+
+void AsioSSLDuplexChannel::closeSocket() {
+#ifndef __APPLE__
+ // Shutdown ssl
+ sslShutdown();
+ // time wait
+ {
+ boost::mutex::scoped_lock lock(sslclosed_lock);
+ if (!sslclosed) {
+ sslclosed_cond.timed_wait(lock, boost::posix_time::milliseconds(1000));
+ }
+ }
+#endif
+ closeLowestLayer();
+}
+
+void AsioSSLDuplexChannel::closeLowestLayer() {
+ boost::system::error_code ec;
+
+ ssl_socket->lowest_layer().cancel(ec);
+ if (ec) {
+ LOG4CXX_WARN(logger, "Channel " << this << " canceling io error : " << ec.message().c_str());
+ }
+
+ ssl_socket->lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
+ if (ec) {
+ LOG4CXX_WARN(logger, "Channel " << this << " shutdown error : " << ec.message().c_str());
+ }
+
+ ssl_socket->lowest_layer().close(ec);
+ if (ec) {
+ LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
+ }
+}
+
+DuplexChannelManagerPtr DuplexChannelManager::create(const Configuration& conf,
+ EventDispatcher& dispatcher) {
+ DuplexChannelManagerPtr factory(new DuplexChannelManager(conf, dispatcher));
+ LOG4CXX_DEBUG(logger, "Created DuplexChannelManager " << factory);
+ return factory;
+}
+
+DuplexChannelManager::DuplexChannelManager(const Configuration& conf,
+ EventDispatcher& dispatcher)
+ : conf(conf), dispatcher(dispatcher) {
+ sslEnabled = conf.getBool(Configuration::RUN_AS_SSL_MODE, DEFAULT_SSL_ENABLED);
+ if (sslEnabled) {
+ sslCtxFactory = SSLContextFactoryPtr(new SSLContextFactory(conf));
+ }
+}
+
+DuplexChannelManager::~DuplexChannelManager() {
+}
+
+DuplexChannelPtr DuplexChannelManager::createChannel(const HostAddress& addr,
+ const ChannelHandlerPtr& handler) {
+ LOG4CXX_DEBUG(logger, "Creating channel with handler " << handler.get());
+ IOServicePtr& service = dispatcher.getService();
+ if (sslEnabled) {
+ boost_ssl_context_ptr sslCtx =
+ sslCtxFactory->createSSLContext(service->getService());
+ return DuplexChannelPtr(new AsioSSLDuplexChannel(service, sslCtx, addr, handler));
+ } else {
+ return DuplexChannelPtr(new AsioDuplexChannel(service, addr, handler));
+ }
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/channel.h Mon Oct 1 13:36:31 2012
@@ -40,6 +40,8 @@
#include <boost/asio.hpp>
#include <boost/asio/ip/tcp.hpp>
+#include <boost/asio/ssl.hpp>
+#include <boost/function.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/shared_mutex.hpp>
@@ -57,9 +59,16 @@ namespace Hedwig {
class ChannelWriteException : public ChannelException {};
class ChannelReadException : public ChannelException {};
class ChannelThreadException : public ChannelException {};
+ class ChannelOutOfMemoryException : public ChannelException {};
+
+ class InvalidSSLPermFileException : public std::exception {};
class DuplexChannel;
typedef boost::shared_ptr<DuplexChannel> DuplexChannelPtr;
+ typedef boost::asio::ip::tcp::socket boost_socket;
+ typedef boost::shared_ptr<boost_socket> boost_socket_ptr;
+ typedef boost::asio::ssl::stream<boost_socket> boost_ssl_socket;
+ typedef boost::shared_ptr<boost_ssl_socket> boost_ssl_socket_ptr;
class ChannelHandler {
public:
@@ -74,90 +83,364 @@ namespace Hedwig {
typedef boost::shared_ptr<ChannelHandler> ChannelHandlerPtr;
+ // A channel interface to send requests
+ class DuplexChannel {
+ public:
+ virtual ~DuplexChannel() {}
+
+ // Issues a connect request to the target host
+ // User could writeRequest after issued connect request, those requests should
+ // be buffered and written until the channel is connected.
+ virtual void connect() = 0;
+
+ // Issues a connect request to the target host
+ // User could writeRequest after issued connect request, those requests should
+ // be buffered and written until the channel is connected.
+ // The provided callback would be triggered after connected.
+ virtual void connect(const OperationCallbackPtr& callback) = 0;
+
+ // Write the request to underlying channel
+ // If the channel is not established, all write requests would be buffered
+ // until channel is connected.
+ virtual void writeRequest(const PubSubRequestPtr& m,
+ const OperationCallbackPtr& callback) = 0;
+
+ // Returns the remote address where this channel is connected to.
+ virtual const HostAddress& getHostAddress() const = 0;
+
+ // Resumes the read operation of this channel asynchronously
+ virtual void startReceiving() = 0;
+
+ // Suspends the read operation of this channel asynchronously
+ virtual void stopReceiving() = 0;
+
+ // Returns if and only if the channel will read a message
+ virtual bool isReceiving() = 0;
+
+ //
+ // Transaction operations
+ //
+
+ // Store a pub/sub request
+ virtual void storeTransaction(const PubSubDataPtr& data) = 0;
+
+ // Remove a pub/sub request
+ virtual PubSubDataPtr retrieveTransaction(long txnid) = 0;
+
+ // Fail all transactions
+ virtual void failAllTransactions() = 0;
+
+ // Handle the case that the channel is disconnected due issues found
+ // when reading or writing
+ virtual void channelDisconnected(const std::exception& e) = 0;
+
+ // Close the channel to release the resources
+ // Once a channel is closed, it can not be open again. Calling this
+ // method on a closed channel has no efffect.
+ virtual void close() = 0;
+ };
- class DuplexChannel : public boost::enable_shared_from_this<DuplexChannel> {
+ typedef boost::asio::ssl::context boost_ssl_context;
+ typedef boost::shared_ptr<boost_ssl_context> boost_ssl_context_ptr;
+
+ class SSLContextFactory {
public:
- DuplexChannel(EventDispatcher& dispatcher, const HostAddress& addr,
- const Configuration& cfg, const ChannelHandlerPtr& handler);
- static void connectCallbackHandler(DuplexChannelPtr channel,
- const boost::system::error_code& error);
- void connect();
-
- static void writeCallbackHandler(DuplexChannelPtr channel, OperationCallbackPtr callback,
- const boost::system::error_code& error,
- std::size_t bytes_transferred);
- void writeRequest(const PubSubRequestPtr& m, const OperationCallbackPtr& callback);
-
- const HostAddress& getHostAddress() const;
-
- void storeTransaction(const PubSubDataPtr& data);
- PubSubDataPtr retrieveTransaction(long txnid);
- void failAllTransactions();
-
- static void sizeReadCallbackHandler(DuplexChannelPtr channel,
- const boost::system::error_code& error,
- std::size_t bytes_transferred);
- static void messageReadCallbackHandler(DuplexChannelPtr channel, std::size_t messagesize,
- const boost::system::error_code& error,
- std::size_t bytes_transferred);
- static void readSize(DuplexChannelPtr channel);
-
- void startReceiving();
- bool isReceiving();
- void stopReceiving();
-
- void startSending();
+ SSLContextFactory(const Configuration& conf);
+ ~SSLContextFactory();
+
+ boost_ssl_context_ptr createSSLContext(boost::asio::io_service& service);
+ private:
+ const Configuration& conf;
+ std::string sslPemFile;
+ };
+
+ typedef boost::shared_ptr<SSLContextFactory> SSLContextFactoryPtr;
+
+ class DuplexChannelManager;
+ typedef boost::shared_ptr<DuplexChannelManager> DuplexChannelManagerPtr;
- void channelDisconnected(const std::exception& e);
- virtual void kill();
+ class DuplexChannelManager : public boost::enable_shared_from_this<DuplexChannelManager> {
+ public:
+ static DuplexChannelManagerPtr create(const Configuration& conf,
+ EventDispatcher& dispatcher);
+ ~DuplexChannelManager();
+
+ DuplexChannelPtr createChannel(const HostAddress& addr, const ChannelHandlerPtr& handler);
+ private:
+ DuplexChannelManager(const Configuration& conf, EventDispatcher& dispatcher);
- inline boost::asio::io_service & getService() {
+ const Configuration& conf;
+ EventDispatcher& dispatcher;
+ bool sslEnabled;
+ SSLContextFactoryPtr sslCtxFactory;
+ };
+
+ class AbstractDuplexChannel;
+ typedef boost::shared_ptr<AbstractDuplexChannel> AbstractDuplexChannelPtr;
+
+ class AbstractDuplexChannel : public DuplexChannel,
+ public boost::enable_shared_from_this<AbstractDuplexChannel> {
+ public:
+ AbstractDuplexChannel(IOServicePtr& service,
+ const HostAddress& addr,
+ const ChannelHandlerPtr& handler);
+ virtual ~AbstractDuplexChannel();
+
+ //
+ // Connect Operation
+ //
+
+ // Asio Connect Callback Handler
+ static void connectCallbackHandler(AbstractDuplexChannelPtr channel,
+ OperationCallbackPtr callback,
+ const boost::system::error_code& error);
+ virtual void connect();
+ virtual void connect(const OperationCallbackPtr& callback);
+
+ //
+ // Write Operation
+ //
+
+ // Asio Write Callback Handler
+ static void writeCallbackHandler(AbstractDuplexChannelPtr channel,
+ OperationCallbackPtr callback,
+ const boost::system::error_code& error,
+ std::size_t bytes_transferred);
+ // Write request
+ virtual void writeRequest(const PubSubRequestPtr& m,
+ const OperationCallbackPtr& callback);
+
+ // get the target host
+ virtual const HostAddress& getHostAddress() const;
+
+ static void sizeReadCallbackHandler(AbstractDuplexChannelPtr channel,
+ const boost::system::error_code& error,
+ std::size_t bytes_transferred);
+ static void messageReadCallbackHandler(AbstractDuplexChannelPtr channel,
+ std::size_t messagesize,
+ const boost::system::error_code& error,
+ std::size_t bytes_transferred);
+ static void readSize(AbstractDuplexChannelPtr channel);
+
+ // start receiving responses from underlying channel
+ virtual void startReceiving();
+ // is the underlying channel in receiving state
+ virtual bool isReceiving();
+ // stop receiving responses from underlying channel
+ virtual void stopReceiving();
+
+ // Store a pub/sub request
+ virtual void storeTransaction(const PubSubDataPtr& data);
+
+ // Remove a pub/sub request
+ virtual PubSubDataPtr retrieveTransaction(long txnid);
+
+ // Fail all transactions
+ virtual void failAllTransactions();
+
+ // channel is disconnected for a specified exception
+ virtual void channelDisconnected(const std::exception& e);
+
+ // close the channel
+ virtual void close();
+
+ inline boost::asio::io_service & getService() const {
return service;
}
- virtual ~DuplexChannel();
- private:
- enum State { UNINITIALISED, CONNECTING, CONNECTED, DEAD };
+ protected:
+ // execute the connect operation
+ virtual void doConnect(const OperationCallbackPtr& callback) = 0;
+
+ virtual void doAfterConnect(const OperationCallbackPtr& callback,
+ const boost::system::error_code& error);
+
+ // Execute the action after channel connect
+ // It would be executed in asio connect callback handler
+ virtual void setSocketOption(boost::system::error_code& ec) = 0;
+ virtual boost::asio::ip::tcp::endpoint
+ getRemoteAddress(boost::system::error_code& ec) = 0;
+ virtual boost::asio::ip::tcp::endpoint
+ getLocalAddress(boost::system::error_code& ec) = 0;
+
+ // Channel failed to connect
+ virtual void channelConnectFailed(const std::exception& e,
+ const OperationCallbackPtr& callback);
+ // Channel connected
+ virtual void channelConnected(const OperationCallbackPtr& callback);
- void setState(State s);
+ // Start sending buffered requests to target host
+ void startSending();
- EventDispatcher& dispatcher;
+ // Write a buffer to underlying socket
+ virtual void writeBuffer(boost::asio::streambuf& buffer,
+ const OperationCallbackPtr& callback) = 0;
+
+ // Read a message from underlying socket
+ virtual void readMsgSize(boost::asio::streambuf& buffer) = 0;
+ virtual void readMsgBody(boost::asio::streambuf& buffer,
+ int toread, uint32_t msgSize) = 0;
+ // is the channel under closing
+ bool isClosed();
+
+ // close the underlying socket to release resource
+ virtual void closeSocket() = 0;
+
+ enum State { UNINITIALISED, CONNECTING, CONNECTED, DEAD };
+ void setState(State s);
+
+ // Address
HostAddress address;
+ private:
ChannelHandlerPtr handler;
boost::asio::io_service &service;
- boost::asio::ip::tcp::socket socket;
+
+ // buffers for input stream
boost::asio::streambuf in_buf;
std::istream instream;
-
- // only exists because protobufs can't play nice with streams (if there's more than message len in it, it tries to read all)
+
+ // only exists because protobufs can't play nice with streams
+ // (if there's more than message len in it, it tries to read all)
char* copy_buf;
std::size_t copy_buf_length;
+ // buffers for output stream
boost::asio::streambuf out_buf;
-
+ // write requests queue
typedef std::pair<PubSubRequestPtr, OperationCallbackPtr> WriteRequest;
boost::mutex write_lock;
std::deque<WriteRequest> write_queue;
+ // channel state
State state;
boost::shared_mutex state_lock;
+ // reading state
bool receiving;
bool reading;
PubSubResponsePtr outstanding_response;
boost::mutex receiving_lock;
-
+
+ // sending state
bool sending;
boost::mutex sending_lock;
+ // flag indicates the channel is closed
+ // some callback might return when closing
+ bool closed;
+
+ // transactions
typedef std::tr1::unordered_map<long, PubSubDataPtr> TransactionMap;
TransactionMap txnid2data;
boost::mutex txnid2data_lock;
boost::shared_mutex destruction_lock;
};
+
+ class AsioDuplexChannel : public AbstractDuplexChannel {
+ public:
+ AsioDuplexChannel(IOServicePtr& service,
+ const HostAddress& addr,
+ const ChannelHandlerPtr& handler);
+ virtual ~AsioDuplexChannel();
+ protected:
+ // execute the connect operation
+ virtual void doConnect(const OperationCallbackPtr& callback);
+
+ // Execute the action after channel connect
+ // It would be executed in asio connect callback handler
+ virtual void setSocketOption(boost::system::error_code& ec);
+ virtual boost::asio::ip::tcp::endpoint
+ getRemoteAddress(boost::system::error_code& ec);
+ virtual boost::asio::ip::tcp::endpoint
+ getLocalAddress(boost::system::error_code& ec);
+
+ // Write a buffer to underlying socket
+ virtual void writeBuffer(boost::asio::streambuf& buffer,
+ const OperationCallbackPtr& callback);
+
+ // Read a message from underlying socket
+ virtual void readMsgSize(boost::asio::streambuf& buffer);
+ virtual void readMsgBody(boost::asio::streambuf& buffer,
+ int toread, uint32_t msgSize);
+
+ // close the underlying socket to release resource
+ virtual void closeSocket();
+ private:
+ // underlying socket
+ boost_socket_ptr socket;
+ };
+
+ typedef boost::shared_ptr<AsioDuplexChannel> AsioDuplexChannelPtr;
+
+ class AsioSSLDuplexChannel;
+ typedef boost::shared_ptr<AsioSSLDuplexChannel> AsioSSLDuplexChannelPtr;
+
+ class AsioSSLDuplexChannel : public AbstractDuplexChannel {
+ public:
+ AsioSSLDuplexChannel(IOServicePtr& service,
+ const boost_ssl_context_ptr& sslCtx,
+ const HostAddress& addr,
+ const ChannelHandlerPtr& handler);
+ virtual ~AsioSSLDuplexChannel();
+ protected:
+ // execute the connect operation
+ virtual void doConnect(const OperationCallbackPtr& callback);
+ // Execute the action after channel connect
+ // It would be executed in asio connect callback handler
+ virtual void setSocketOption(boost::system::error_code& ec);
+ virtual boost::asio::ip::tcp::endpoint
+ getRemoteAddress(boost::system::error_code& ec);
+ virtual boost::asio::ip::tcp::endpoint
+ getLocalAddress(boost::system::error_code& ec);
+
+ virtual void channelConnected(const OperationCallbackPtr& callback);
+
+ // Start SSL Hand Shake after the channel is connected
+ void startHandShake(const OperationCallbackPtr& callback);
+ // Asio Callback After Hand Shake
+ static void handleHandshake(AsioSSLDuplexChannelPtr channel,
+ OperationCallbackPtr callback,
+ const boost::system::error_code& error);
+
+ void sslChannelConnected(const OperationCallbackPtr& callback);
+
+ // Write a buffer to underlying socket
+ virtual void writeBuffer(boost::asio::streambuf& buffer,
+ const OperationCallbackPtr& callback);
+
+ // Read a message from underlying socket
+ virtual void readMsgSize(boost::asio::streambuf& buffer);
+ virtual void readMsgBody(boost::asio::streambuf& buffer,
+ int toread, uint32_t msgSize);
+
+ // close the underlying socket to release resource
+ virtual void closeSocket();
+
+ private:
+#ifndef __APPLE__
+ // Shutdown ssl
+ void sslShutdown();
+ // Handle ssl shutdown
+ void handleSSLShutdown(const boost::system::error_code& error);
+#endif
+ // Close lowest layer
+ void closeLowestLayer();
+
+ // underlying ssl socket
+ boost_ssl_socket_ptr ssl_socket;
+ // ssl context
+ boost_ssl_context_ptr ssl_ctx;
+
+#ifndef __APPLE__
+ // Flag indicated ssl is closed.
+ bool sslclosed;
+ boost::mutex sslclosed_lock;
+ boost::condition_variable sslclosed_cond;
+#endif
+ };
struct DuplexChannelPtrHash : public std::unary_function<DuplexChannelPtr, size_t> {
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/client.cpp Mon Oct 1 13:36:31 2012
@@ -39,6 +39,8 @@ const std::string Configuration::SYNC_RE
const std::string Configuration::SUBSCRIBER_AUTOCONSUME = "hedwig.cpp.subscriber_autoconsume";
const std::string Configuration::NUM_DISPATCH_THREADS = "hedwig.cpp.num_dispatch_threads";
const std::string Configuration::SUBSCRIPTION_MESSAGE_BOUND = "hedwig.cpp.subscription_message_bound";
+const std::string Configuration::RUN_AS_SSL_MODE = "hedwig.cpp.ssl_mode";
+const std::string Configuration::SSL_PEM_FILE = "hedwig.cpp.ssl_pem";
Client::Client(const Configuration& conf) {
LOG4CXX_DEBUG(logger, "Client::Client (" << this << ")");
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.cpp Mon Oct 1 13:36:31 2012
@@ -30,7 +30,6 @@ static log4cxx::LoggerPtr logger(log4cxx
using namespace Hedwig;
const std::string DEFAULT_SERVER_DEFAULT_VAL = "";
-const int DEFAULT_NUM_DISPATCH_THREADS = 1;
void SyncOperationCallback::wait() {
boost::unique_lock<boost::mutex> lock(mut);
@@ -182,25 +181,25 @@ long ClientTxnCounter::next() { // woul
ClientImplPtr ClientImpl::Create(const Configuration& conf) {
ClientImplPtr impl(new ClientImpl(conf));
LOG4CXX_DEBUG(logger, "Creating Clientimpl " << impl);
-
impl->dispatcher->start();
-
return impl;
}
void ClientImpl::Destroy() {
LOG4CXX_DEBUG(logger, "destroying Clientimpl " << this);
- dispatcher->stop();
{
boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
shuttingDownFlag = true;
for (ChannelMap::iterator iter = allchannels.begin(); iter != allchannels.end(); ++iter ) {
- (*iter)->kill();
+ (*iter)->close();
}
allchannels.clear();
}
+ // SSL Channel shutdown needs to send packets to server
+ // so we only stop dispatcher after all channels are closed
+ dispatcher->stop();
/* destruction of the maps will clean up any items they hold */
@@ -217,8 +216,10 @@ void ClientImpl::Destroy() {
ClientImpl::ClientImpl(const Configuration& conf)
: conf(conf), publisher(NULL), subscriber(NULL), counterobj(), shuttingDownFlag(false)
{
- defaultHost = HostAddress::fromString(conf.get(Configuration::DEFAULT_SERVER, DEFAULT_SERVER_DEFAULT_VAL));
- dispatcher = EventDispatcherPtr(new EventDispatcher(conf.getInt(Configuration::NUM_DISPATCH_THREADS, DEFAULT_NUM_DISPATCH_THREADS)));
+ defaultHost = HostAddress::fromString(conf.get(Configuration::DEFAULT_SERVER,
+ DEFAULT_SERVER_DEFAULT_VAL));
+ dispatcher = EventDispatcherPtr(new EventDispatcher(conf));
+ channelManager = DuplexChannelManager::create(conf, *dispatcher);
}
Subscriber& ClientImpl::getSubscriber() {
@@ -275,15 +276,13 @@ void ClientImpl::redirectRequest(const D
if (data->getType() == SUBSCRIBE) {
// a redirect for subscription, kill old channel and remove old channel from all channels list
// otherwise old channel will not be destroyed, caused lost of CLOSE_WAIT connections
- channel->kill();
- {
- boost::lock_guard<boost::shared_mutex> aclock(allchannels_lock);
- allchannels.erase(channel); // channel should be deleted here
- }
+ removeAndCloseChannel(channel);
+
SubscriberClientChannelHandlerPtr handler(new SubscriberClientChannelHandler(shared_from_this(),
this->getSubscriberImpl(), data));
newchannel = createChannel(data->getTopic(), handler);
handler->setChannel(newchannel);
+ newchannel->connect();
getSubscriberImpl().doSubscribe(newchannel, data, handler);
} else if (data->getType() == PUBLISH) {
newchannel = getChannel(data->getTopic());
@@ -314,14 +313,15 @@ DuplexChannelPtr ClientImpl::createChann
setHostForTopic(topic, addr);
}
- DuplexChannelPtr channel(new DuplexChannel(*dispatcher, addr, conf, handler));
+ DuplexChannelPtr channel = channelManager->createChannel(addr, handler);
boost::lock_guard<boost::shared_mutex> lock(allchannels_lock);
if (shuttingDownFlag) {
- channel->kill();
+ channel->close();
throw ShuttingDownException();
}
- channel->connect();
+ // Don't connect here, otherwise connect callback may be triggered before setChannel
+ // channel->connect();
allchannels.insert(channel);
LOG4CXX_DEBUG(logger, "(create) All channels size: " << allchannels.size());
@@ -345,6 +345,7 @@ DuplexChannelPtr ClientImpl::getChannel(
LOG4CXX_DEBUG(logger, " No channel for topic, creating new channel.get() " << channel.get() << " addr " << addr.getAddressString());
ChannelHandlerPtr handler(new HedwigClientChannelHandler(shared_from_this()));
channel = createChannel(topic, handler);
+ channel->connect();
boost::lock_guard<boost::shared_mutex> lock(host2channel_lock);
host2channel[addr] = channel;
@@ -376,7 +377,6 @@ void ClientImpl::channelDied(const Duple
boost::lock_guard<boost::shared_mutex> h2tlock(host2topics_lock);
boost::lock_guard<boost::shared_mutex> h2clock(host2channel_lock);
boost::lock_guard<boost::shared_mutex> t2hlock(topic2host_lock);
- boost::lock_guard<boost::shared_mutex> aclock(allchannels_lock);
// get host
HostAddress addr = channel->getHostAddress();
@@ -385,8 +385,13 @@ void ClientImpl::channelDied(const Duple
}
host2topics.erase(addr);
host2channel.erase(addr);
+ removeAndCloseChannel(channel);
+}
+void ClientImpl::removeAndCloseChannel(const DuplexChannelPtr& channel) {
+ boost::lock_guard<boost::shared_mutex> aclock(allchannels_lock);
allchannels.erase(channel); // channel should be deleted here
+ channel->close(); // close channel
}
const Configuration& ClientImpl::getConfiguration() {
@@ -394,5 +399,5 @@ const Configuration& ClientImpl::getConf
}
boost::asio::io_service& ClientImpl::getService() {
- return dispatcher->getService();
+ return dispatcher->getService()->getService();
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/clientimpl.h Mon Oct 1 13:36:31 2012
@@ -206,6 +206,7 @@ namespace Hedwig {
void setChannelForHost(const HostAddress& address, const DuplexChannelPtr& channel);
void channelDied(const DuplexChannelPtr& channel);
+ void removeAndCloseChannel(const DuplexChannelPtr& channel);
bool shuttingDown() const;
SubscriberImpl& getSubscriberImpl();
@@ -230,7 +231,8 @@ namespace Hedwig {
typedef boost::shared_ptr<EventDispatcher> EventDispatcherPtr;
EventDispatcherPtr dispatcher;
-
+ DuplexChannelManagerPtr channelManager;
+
typedef std::tr1::unordered_multimap<HostAddress, std::string, HostAddressHash > Host2TopicsMap;
Host2TopicsMap host2topics;
boost::shared_mutex host2topics_lock;
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.cpp Mon Oct 1 13:36:31 2012
@@ -27,41 +27,71 @@ static log4cxx::LoggerPtr logger(log4cxx
using namespace Hedwig;
-EventDispatcher::EventDispatcher(int numThreads)
- : num_threads(numThreads), running(false), next_io_service(0) {
- if (0 == num_threads) {
- throw std::runtime_error("number of threads in dispatcher is zero");
- }
- for (size_t i = 0; i < num_threads; i++) {
- io_service_ptr service(new boost::asio::io_service);
- services.push_back(service);
+const int DEFAULT_NUM_DISPATCH_THREADS = 1;
+
+IOService::IOService() {
+}
+
+IOService::~IOService() {}
+
+void IOService::start() {
+ if (work.get()) {
+ return;
}
+ work = work_ptr(new boost::asio::io_service::work(service));
}
-void EventDispatcher::run_forever(io_service_ptr service, size_t idx) {
- LOG4CXX_DEBUG(logger, "Starting event dispatcher " << idx);
+void IOService::stop() {
+ if (!work.get()) {
+ return;
+ }
+
+ work = work_ptr();
+ service.stop();
+}
+void IOService::run() {
while (true) {
try {
- service->run();
+ service.run();
break;
} catch (std::exception &e) {
- LOG4CXX_ERROR(logger, "Exception in dispatch handler " << idx << " : " << e.what());
+ LOG4CXX_ERROR(logger, "Exception in IO Service " << this << " : " << e.what());
}
}
- LOG4CXX_DEBUG(logger, "Event dispatcher " << idx << " done");
+}
+
+EventDispatcher::EventDispatcher(const Configuration& conf)
+ : conf(conf), running(false), next_io_service(0) {
+ num_threads = conf.getInt(Configuration::NUM_DISPATCH_THREADS,
+ DEFAULT_NUM_DISPATCH_THREADS);
+ if (0 == num_threads) {
+ throw std::runtime_error("number of threads in dispatcher is zero");
+ }
+ for (size_t i = 0; i < num_threads; i++) {
+ services.push_back(IOServicePtr(new IOService()));
+ }
+}
+
+void EventDispatcher::run_forever(IOServicePtr service, size_t idx) {
+ LOG4CXX_INFO(logger, "Starting event dispatcher " << idx);
+
+ service->run();
+
+ LOG4CXX_INFO(logger, "Event dispatcher " << idx << " done");
}
void EventDispatcher::start() {
if (running) {
return;
}
+
for (size_t i = 0; i < num_threads; i++) {
- io_service_ptr service = services[i];
- work_ptr work(new boost::asio::io_service::work(*service));
- works.push_back(work);
+ IOServicePtr service = services[i];
+ service->start();
// new thread
- thread_ptr t(new boost::thread(boost::bind(&EventDispatcher::run_forever, this, service, i)));
+ thread_ptr t(new boost::thread(boost::bind(&EventDispatcher::run_forever,
+ this, service, i)));
threads.push_back(t);
}
running = true;
@@ -72,8 +102,6 @@ void EventDispatcher::stop() {
return;
}
- works.clear();
-
for (size_t i = 0; i < num_threads; i++) {
services[i]->stop();
}
@@ -90,13 +118,12 @@ EventDispatcher::~EventDispatcher() {
services.clear();
}
-boost::asio::io_service& EventDispatcher::getService() {
+IOServicePtr& EventDispatcher::getService() {
size_t next = 0;
{
boost::lock_guard<boost::mutex> lock(next_lock);
next = next_io_service;
next_io_service = (next_io_service + 1) % num_threads;
}
- boost::asio::io_service& service = *services[next];
- return service;
+ return services[next];
}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.h?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/eventdispatcher.h Mon Oct 1 13:36:31 2012
@@ -20,37 +20,62 @@
#include <vector>
+#include <hedwig/client.h>
+
#include <boost/asio.hpp>
#include <boost/thread.hpp>
-#include <boost/thread/mutex.hpp>
#include <boost/shared_ptr.hpp>
namespace Hedwig {
- typedef boost::shared_ptr<boost::asio::io_service> io_service_ptr;
typedef boost::shared_ptr<boost::asio::io_service::work> work_ptr;
typedef boost::shared_ptr<boost::thread> thread_ptr;
+ class IOService;
+ typedef boost::shared_ptr<IOService> IOServicePtr;
+
+ class IOService {
+ public:
+ IOService();
+ virtual ~IOService();
+
+ // start the io service
+ void start();
+ // stop the io service
+ void stop();
+ // run the io service
+ void run();
+
+ inline boost::asio::io_service& getService() {
+ return service;
+ }
+
+ private:
+ boost::asio::io_service service;
+ work_ptr work;
+ };
+
class EventDispatcher {
public:
- EventDispatcher(int numThreads = 1);
+ EventDispatcher(const Configuration& conf);
~EventDispatcher();
void start();
+
void stop();
- boost::asio::io_service& getService();
-
+ IOServicePtr& getService();
+
private:
- void run_forever(io_service_ptr service, size_t idx);
+ void run_forever(IOServicePtr service, size_t idx);
+
+ const Configuration& conf;
// number of threads
size_t num_threads;
// running flag
bool running;
// pool of io_services.
- std::vector<io_service_ptr> services;
- // pool of works
- std::vector<work_ptr> works;
+ std::vector<IOServicePtr> services;
// threads
std::vector<thread_ptr> threads;
// next io_service used for a connection
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.cpp Mon Oct 1 13:36:31 2012
@@ -129,7 +129,8 @@ void SubscriberConsumeCallback::operatio
LOG4CXX_ERROR(logger, "Error passing message to client transaction: " << data->getTxnId() << " error: " << exception.what()
<< " retrying in " << retrywait << " Microseconds");
- boost::asio::deadline_timer t(handler->getChannel()->getService(), boost::posix_time::milliseconds(retrywait));
+ AbstractDuplexChannelPtr chPtr = boost::dynamic_pointer_cast<AbstractDuplexChannel>(handler->getChannel());
+ boost::asio::deadline_timer t(chPtr->getService(), boost::posix_time::milliseconds(retrywait));
t.async_wait(boost::bind(&SubscriberConsumeCallback::timerComplete, handler, m, boost::asio::placeholders::error));
}
@@ -179,8 +180,23 @@ void SubscriberClientChannelHandler::mes
void SubscriberClientChannelHandler::close() {
closed = true;
+ // cancel reconnect timer
+ if (reconnectTimer.get()) {
+ boost::system::error_code ec;
+ reconnectTimer->cancel(ec);
+ if (ec) {
+ LOG4CXX_WARN(logger, "Handler " << this << " cancel reconnect task " << reconnectTimer.get() << " error :" << ec.message().c_str());
+ }
+ }
+
+ LOG4CXX_INFO(logger, "close subscription handler " << this << " for (topic:" << origData->getTopic()
+ << ", subscriberId:" << origData->getSubscriberId() << ").");
if (channel.get()) {
- channel->kill();
+ // need to ensure the channel is removed from allchannels list
+ // since it will be killed
+ client->removeAndCloseChannel(channel);
+ LOG4CXX_INFO(logger, "removed subscription channel " << channel.get() << " for (topic: " << origData->getTopic()
+ << ", subscriberId:" << origData->getSubscriberId() << ").");
}
}
@@ -236,9 +252,10 @@ void SubscriberClientChannelHandler::rec
if (should_wait) {
int retrywait = client->getConfiguration().getInt(Configuration::RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME,
DEFAULT_RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME);
-
+ AbstractDuplexChannelPtr chPtr = boost::dynamic_pointer_cast<AbstractDuplexChannel>(channel);
// set reconnect timer
- reconnectTimer = ReconnectTimerPtr(new boost::asio::deadline_timer(channel->getService(), boost::posix_time::milliseconds(retrywait)));
+ reconnectTimer = ReconnectTimerPtr(new boost::asio::deadline_timer(chPtr->getService(),
+ boost::posix_time::milliseconds(retrywait)));
reconnectTimer->async_wait(boost::bind(&SubscriberClientChannelHandler::reconnectTimerComplete, shared_from_this(),
channel, e, boost::asio::placeholders::error));
return;
@@ -256,6 +273,8 @@ void SubscriberClientChannelHandler::rec
DuplexChannelPtr newchannel = client->createChannel(origData->getTopic(), baseptr);
newhandler->setChannel(newchannel);
+ newchannel->connect();
+ LOG4CXX_DEBUG(logger, "Create a new channel " << newchannel.get() << " to handover delivery to new handler " << newhandler.get());
handoverDelivery(newhandler);
// remove record of the failed channel from the subscriber
@@ -368,6 +387,10 @@ void SubscriberImpl::asyncSubscribe(cons
DuplexChannelPtr channel = client->createChannel(topic, handler);
handler->setChannel(channel);
+ channel->connect();
+ LOG4CXX_INFO(logger, "New handler " << handler.get() << " on channel " << channel.get()
+ << " is created for (topic:" << topic << ", subscriber:" << subscriberId << ", txn:"
+ << data->getTxnId() << ").");
doSubscribe(channel, data, handler);
}
@@ -377,15 +400,19 @@ void SubscriberImpl::doSubscribe(const D
OperationCallbackPtr writecb(new SubscriberWriteCallback(client, data));
channel->writeRequest(data->getRequest(), writecb);
- boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2handler_lock);
- TopicSubscriber t(data->getTopic(), data->getSubscriberId());
- SubscriberClientChannelHandlerPtr oldhandler = topicsubscriber2handler[t];
- if (oldhandler != NULL) {
+ SubscriberClientChannelHandlerPtr oldhandler;
+ {
+ boost::lock_guard<boost::shared_mutex> lock(topicsubscriber2handler_lock);
+ TopicSubscriber t(data->getTopic(), data->getSubscriberId());
+ oldhandler = topicsubscriber2handler[t];
+ topicsubscriber2handler[t] = handler;
+ }
+ if (oldhandler.get() != NULL) {
+ LOG4CXX_DEBUG(logger, "(topic:" << data->getTopic() << ", subscriber:" << data->getSubscriberId()
+ << ") handover delivery from old handler " << oldhandler.get()
+ << " to new handler " << handler.get());
oldhandler->handoverDelivery(handler);
}
- topicsubscriber2handler[t] = handler;
-
- LOG4CXX_DEBUG(logger, "Set topic subscriber for topic(" << data->getTopic() << ") subscriberId(" << data->getSubscriberId() << ") to " << handler.get() << " topicsubscriber2topic(" << &topicsubscriber2handler << ")");
}
void SubscriberImpl::unsubscribe(const std::string& topic, const std::string& subscriberId) {
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/subscriberimpl.h Mon Oct 1 13:36:31 2012
@@ -107,7 +107,8 @@ namespace Hedwig {
class SubscriberClientChannelHandler : public HedwigClientChannelHandler,
public boost::enable_shared_from_this<SubscriberClientChannelHandler> {
public:
- SubscriberClientChannelHandler(const ClientImplPtr& client, SubscriberImpl& subscriber, const PubSubDataPtr& data);
+ SubscriberClientChannelHandler(const ClientImplPtr& client, SubscriberImpl& subscriber,
+ const PubSubDataPtr& data);
~SubscriberClientChannelHandler();
void messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m);
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.cpp Mon Oct 1 13:36:31 2012
@@ -39,7 +39,7 @@ const std::string UNITIALISED_HOST("UNIN
const int DEFAULT_PORT = 4080;
const int DEFAULT_SSL_PORT = 9876;
-HostAddress::HostAddress() : initialised(false), address_str() {
+HostAddress::HostAddress() : initialised(false), address_str(), ssl_host_port(0) {
}
HostAddress::~HostAddress() {
@@ -65,13 +65,23 @@ uint32_t HostAddress::ip() const {
return host_ip;
}
+void HostAddress::updateIP(uint32_t ip) {
+ this->host_ip = ip;
+}
+
uint16_t HostAddress::port() const {
return host_port;
}
+uint16_t HostAddress::sslPort() const {
+ return ssl_host_port;
+}
+
void HostAddress::parse_string() {
char* url = strdup(address_str.c_str());
+ LOG4CXX_DEBUG(logger, "Parse address : " << url);
+
if (url == NULL) {
LOG4CXX_ERROR(logger, "You seems to be out of memory");
throw OomException();
@@ -130,6 +140,7 @@ void HostAddress::parse_string() {
host_ip = ntohl(socket_addr.sin_addr.s_addr);
host_port = ntohs(socket_addr.sin_port);
+ ssl_host_port = sslport;
freeaddrinfo(addr);
free((void*)url);
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/lib/util.h Mon Oct 1 13:36:31 2012
@@ -53,6 +53,11 @@ namespace Hedwig {
const std::string& getAddressString() const;
uint32_t ip() const;
uint16_t port() const;
+ uint16_t sslPort() const;
+
+ // the real ip address is different from default server
+ // if default server is a VIP
+ void updateIP(uint32_t ip);
static HostAddress fromString(std::string host);
@@ -64,6 +69,7 @@ namespace Hedwig {
std::string address_str;
uint32_t host_ip;
uint16_t host_port;
+ uint16_t ssl_host_port;
};
/**
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/log4cxx.conf Mon Oct 1 13:36:31 2012
@@ -22,20 +22,21 @@
log4j.appender.rootAppender=org.apache.log4j.ConsoleAppender
log4j.appender.rootAppender.layout=org.apache.log4j.BasicLayout
-#log4j.appender.hedwig=org.apache.log4j.RollingFileAppender
-log4j.appender.hedwig=org.apache.log4j.ConsoleAppender
-#log4j.appender.hedwig.fileName=./testLog.log
+log4j.appender.hedwig=org.apache.log4j.RollingFileAppender
+#log4j.appender.hedwig=org.apache.log4j.ConsoleAppender
+log4j.appender.hedwig.fileName=./testLog.log
log4j.appender.hedwig.layout=org.apache.log4j.PatternLayout
log4j.appender.hedwig.layout.ConversionPattern=[%d{%H:%M:%S.%l}] %t %p %c - %m%n
-log4j.appender.hedwigtest=org.apache.log4j.ConsoleAppender
-#log4j.appender.hedwig.fileName=./testLog.log
+log4j.appender.hedwig=org.apache.log4j.RollingFileAppender
+#log4j.appender.hedwigtest=org.apache.log4j.ConsoleAppender
+log4j.appender.hedwig.fileName=./testLog.log
log4j.appender.hedwigtest.layout=org.apache.log4j.PatternLayout
log4j.appender.hedwigtest.layout.ConversionPattern=[%d{%H:%M:%S.%l}] %t %p %c - %m%n
# category
-log4j.category.hedwig=OFF, hedwig
-log4j.category.hedwigtest=OFF, hedwigtest
+log4j.category.hedwig=DEBUG, hedwig
+log4j.category.hedwigtest=DEBUG, hedwigtest
log4j.rootCategory=OFF
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/server-control.sh Mon Oct 1 13:36:31 2012
@@ -94,6 +94,7 @@ start_hw_server () {
REGION=$1
COUNT=$2
PORT=$((4080+$COUNT))
+ SSL_PORT=$((9876+$COUNT))
export HEDWIG_LOG_CONF=/tmp/hw-log4j-$COUNT.properties
cat > $HEDWIG_LOG_CONF <<EOF
@@ -112,6 +113,8 @@ log4j.appender.ROLLINGFILE.MaxFileSize=1
#log4j.appender.ROLLINGFILE.MaxBackupIndex=10
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n
+log4j.logger.org.apache.zookeeper=OFF,ROLLINGFILE
+log4j.logger.org.apache.hedwig.zookeeper=OFF,ROLLINGFILE
EOF
export HEDWIG_SERVER_CONF=/tmp/hw-server-$COUNT.conf
@@ -122,9 +125,11 @@ zk_timeout=2000
# The port at which the clients will connect.
server_port=$PORT
# The SSL port at which the clients will connect (only if SSL is enabled).
-ssl_server_port=9876
+ssl_server_port=$SSL_PORT
# Flag indicating if the server should also operate in SSL mode.
-ssl_enabled=false
+ssl_enabled=true
+cert_path=$PWD/../../../../../hedwig-server/src/main/resources/server.p12
+password=eUySvp2phM2Wk
region=$REGION
EOF
sh $HWSCRIPT server 2>&1 > hwoutput.$COUNT.log &
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/scripts/tester.sh Mon Oct 1 13:36:31 2012
@@ -24,15 +24,19 @@ export LOG4CXX_CONF=`pwd`/log4cxx.conf
source network-delays.sh
source server-control.sh
-all() {
+runtest() {
if [ "z$HEDWIG_NETWORK_DELAY" != "z" ]; then
setup_delays $HEDWIG_NETWORK_DELAY
fi
stop_cluster;
start_cluster;
+ if [ "z$1" != "z" ]; then
+ ../test/hedwigtest -s true
+ else
+ ../test/hedwigtest
+ fi
- ../test/hedwigtest
RESULT=$?
stop_cluster;
@@ -101,6 +105,12 @@ case "$1" in
stop-cluster)
stop_cluster
;;
+ simple-test)
+ runtest
+ ;;
+ ssl-test)
+ runtest ssl
+ ;;
setup-delays)
setup_delays $2
;;
@@ -108,7 +118,8 @@ case "$1" in
clear_delays
;;
all)
- all
+ runtest
+ runtest ssl
;;
singletest)
singletest $2
@@ -118,7 +129,13 @@ case "$1" in
Usage: tester.sh [command]
tester.sh all
- Run through the tests, setting up and cleaning up all prerequisites.
+ Run through the tests (both simple and ssl), setting up and cleaning up all prerequisites.
+
+tester.sh simple-test
+ Run through the tests (simple mode), setting up and cleaning up all prerequisites.
+
+tester.sh ssl-test
+ Run through the tests (ssl mode), setting up and cleaning up all prerequisites.
tester.sh singletest <name>
Run a single test
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/Makefile.am Mon Oct 1 13:36:31 2012
@@ -26,6 +26,13 @@ hedwigtest_LDFLAGS = -no-undefined $(BOO
check: hedwigtest
bash ../scripts/tester.sh all
+
+sslcheck: hedwigtest
+ bash ../scripts/tester.sh ssl-test
+
+simplecheck: hedwigtest
+ bash ../scripts/tester.sh simple-test
+
else
check:
@echo "\n\nYou haven't configured with gtest. Run the ./configure command with --enable-gtest=<path_to_gtest>"
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/main.cpp Mon Oct 1 13:36:31 2012
@@ -34,6 +34,9 @@
#include "gtest/gtest.h"
+bool TestServerConfiguration::isSSL = false;
+std::string TestServerConfiguration::certFile = "";
+
int main( int argc, char **argv)
{
try {
@@ -48,6 +51,25 @@ int main( int argc, char **argv)
} catch (...) {
std::cerr << "unknown exception while configuring log4cpp vi'." << std::endl;
}
+
+ // Enable SSL for testing
+ int opt;
+ while((opt = getopt(argc,argv,"s:c:")) > 0) {
+ switch(opt) {
+ case 's':
+ if (std::string(optarg) == "true") {
+ std::cout << "run in ssl mode...." << std::endl;
+ TestServerConfiguration::isSSL = true;
+ } else {
+ TestServerConfiguration::isSSL = false;
+ }
+ break;
+ case 'c':
+ std::cout << "use cert file :" << optarg << std::endl;
+ TestServerConfiguration::certFile = std::string(optarg);
+ break;
+ }//switch
+ }//while
::testing::InitGoogleTest(&argc, argv);
int ret = RUN_ALL_TESTS();
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messageboundtest.cpp Mon Oct 1 13:36:31 2012
@@ -33,32 +33,16 @@
static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-class MessageBoundConfiguration : public Hedwig::Configuration {
+class MessageBoundConfiguration : public TestServerConfiguration {
public:
- MessageBoundConfiguration() : address("localhost:4081") {}
+ MessageBoundConfiguration() : TestServerConfiguration() {}
virtual int getInt(const std::string& key, int defaultVal) const {
if (key == Configuration::SUBSCRIPTION_MESSAGE_BOUND) {
return 5;
}
- return defaultVal;
- }
-
- virtual const std::string get(const std::string& key, const std::string& defaultVal) const {
- if (key == Configuration::DEFAULT_SERVER) {
- return address;
- } else {
- return defaultVal;
- }
+ return TestServerConfiguration::getInt(key, defaultVal);
}
-
- virtual bool getBool(const std::string& /*key*/, bool defaultVal) const {
- return defaultVal;
- }
-
-protected:
- const std::string address;
};
class MessageBoundOrderCheckingMessageHandlerCallback : public Hedwig::MessageHandlerCallback {
@@ -91,10 +75,16 @@ protected:
void sendXExpectLastY(Hedwig::Publisher& pub, Hedwig::Subscriber& sub, const std::string& topic,
const std::string& subid, int X, int Y) {
- for (int i = 0; i < X; i++) {
+ for (int i = 0; i < X;) {
std::stringstream oss;
oss << i;
- pub.publish(topic, oss.str());
+ try {
+ pub.publish(topic, oss.str());
+ ++i;
+ } catch (std::exception &e) {
+ LOG4CXX_WARN(logger, "Exception when publishing message " << i << " : "
+ << e.what());
+ }
}
sub.subscribe(topic, subid, Hedwig::SubscribeRequest::ATTACH);
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp?rev=1392319&r1=1392318&r2=1392319&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/cpp/test/messagefiltertest.cpp Mon Oct 1 13:36:31 2012
@@ -33,32 +33,17 @@
static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-class MessageFilterConfiguration : public Hedwig::Configuration {
+class MessageFilterConfiguration : public TestServerConfiguration {
public:
- MessageFilterConfiguration() : address("localhost:4081") {}
-
- virtual int getInt(const std::string& key, int defaultVal) const {
- return defaultVal;
- }
-
- virtual const std::string get(const std::string& key, const std::string& defaultVal) const {
- if (key == Configuration::DEFAULT_SERVER) {
- return address;
- } else {
- return defaultVal;
- }
- }
+ MessageFilterConfiguration() : TestServerConfiguration() {}
virtual bool getBool(const std::string& key, bool defaultVal) const {
if (key == Configuration::SUBSCRIBER_AUTOCONSUME) {
return false;
} else {
- return defaultVal;
+ return TestServerConfiguration::getBool(key, defaultVal);
}
}
-
- protected:
- const std::string address;
};
class ModMessageFilter : public Hedwig::ClientMessageFilter {