You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:41 UTC
[31/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/inc/hedwig/callback.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/inc/hedwig/callback.h b/hedwig-client/src/main/cpp/inc/hedwig/callback.h
deleted file mode 100644
index 80e961b..0000000
--- a/hedwig-client/src/main/cpp/inc/hedwig/callback.h
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef HEDWIG_CALLBACK_H
-#define HEDWIG_CALLBACK_H
-
-#include <string>
-#include <hedwig/exceptions.h>
-#include <hedwig/protocol.h>
-
-#ifdef USE_BOOST_TR1
-#include <boost/tr1/memory.hpp>
-#else
-#include <tr1/memory>
-#endif
-
-namespace Hedwig {
-
- //
- // A Listener registered for a Subscriber instance to emit events
- // for those disable resubscribe subscriptions.
- //
- class SubscriptionListener {
- public:
- virtual void processEvent(const std::string &topic, const std::string &subscriberId,
- const Hedwig::SubscriptionEvent event) = 0;
- virtual ~SubscriptionListener() {};
- };
- typedef std::tr1::shared_ptr<SubscriptionListener> SubscriptionListenerPtr;
-
- template<class R>
- class Callback {
- public:
- virtual void operationComplete(const R& result) = 0;
- virtual void operationFailed(const std::exception& exception) = 0;
-
- virtual ~Callback() {};
- };
-
- class OperationCallback {
- public:
- virtual void operationComplete() = 0;
- virtual void operationFailed(const std::exception& exception) = 0;
-
- virtual ~OperationCallback() {};
- };
- typedef std::tr1::shared_ptr<OperationCallback> OperationCallbackPtr;
-
- class MessageHandlerCallback {
- public:
- virtual void consume(const std::string& topic, const std::string& subscriberId, const Message& msg, OperationCallbackPtr& callback) = 0;
-
- virtual ~MessageHandlerCallback() {};
- };
- typedef std::tr1::shared_ptr<MessageHandlerCallback> MessageHandlerCallbackPtr;
-
- typedef std::tr1::shared_ptr<SubscriptionPreferences> SubscriptionPreferencesPtr;
-
- class ClientMessageFilter {
- public:
- virtual void setSubscriptionPreferences(const std::string& topic, const std::string& subscriberId,
- const SubscriptionPreferencesPtr& preferences) = 0;
- virtual bool testMessage(const Message& message) = 0;
-
- virtual ~ClientMessageFilter() {};
- };
- typedef std::tr1::shared_ptr<ClientMessageFilter> ClientMessageFilterPtr;
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/inc/hedwig/client.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/inc/hedwig/client.h b/hedwig-client/src/main/cpp/inc/hedwig/client.h
deleted file mode 100644
index 7b914bc..0000000
--- a/hedwig-client/src/main/cpp/inc/hedwig/client.h
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef HEDWIG_CLIENT_H
-#define HEDWIG_CLIENT_H
-
-#include <string>
-
-#ifdef USE_BOOST_TR1
-#include <boost/tr1/memory.hpp>
-#else
-#include <tr1/memory>
-#endif
-
-#include <hedwig/subscribe.h>
-#include <hedwig/publish.h>
-#include <hedwig/exceptions.h>
-#include <boost/noncopyable.hpp>
-#include <boost/shared_ptr.hpp>
-
-namespace Hedwig {
-
- class ClientImpl;
- typedef boost::shared_ptr<ClientImpl> ClientImplPtr;
-
- class Configuration {
- public:
- static const std::string DEFAULT_SERVER;
- static const std::string MESSAGE_CONSUME_RETRY_WAIT_TIME;
- static const std::string SUBSCRIBER_CONSUME_RETRY_WAIT_TIME;
- static const std::string MAX_MESSAGE_QUEUE_SIZE;
- static const std::string RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME;
- static const std::string SYNC_REQUEST_TIMEOUT;
- static const std::string SUBSCRIBER_AUTOCONSUME;
- static const std::string NUM_DISPATCH_THREADS;
- static const std::string SSL_ENABLED;
- static const std::string SSL_PEM_FILE;
- static const std::string SUBSCRIPTION_CHANNEL_SHARING_ENABLED;
- /**
- * The maximum number of messages the hub will queue for subscriptions
- * created using this configuration. The hub will always queue the most
- * recent messages. If there are enough publishes to the topic to hit
- * the bound, then the oldest messages are dropped from the queue.
- *
- * A bound of 0 disabled the bound completely.
- */
- static const std::string SUBSCRIPTION_MESSAGE_BOUND;
-
- public:
- Configuration() {};
- virtual int getInt(const std::string& key, int defaultVal) const = 0;
- virtual const std::string get(const std::string& key, const std::string& defaultVal) const = 0;
- virtual bool getBool(const std::string& key, bool defaultVal) const = 0;
-
- virtual ~Configuration() {}
- };
-
- /**
- Main Hedwig client class. This class is used to acquire an instance of the Subscriber of Publisher.
- */
- class Client : private boost::noncopyable {
- public:
- Client(const Configuration& conf);
-
- /**
- Retrieve the subscriber object
- */
- Subscriber& getSubscriber();
-
- /**
- Retrieve the publisher object
- */
- Publisher& getPublisher();
-
- ~Client();
-
- private:
- ClientImplPtr clientimpl;
- };
-
-
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h b/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h
deleted file mode 100644
index b44fed9..0000000
--- a/hedwig-client/src/main/cpp/inc/hedwig/exceptions.h
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef HEDWIG_EXCEPTION_H
-#define HEDWIG_EXCEPTION_H
-
-#include <exception>
-
-namespace Hedwig {
-
- class ClientException : public std::exception { };
-
- class ClientTimeoutException : public ClientException {};
-
- class ServiceDownException : public ClientException {};
- class CannotConnectException : public ClientException {};
- class UnexpectedResponseException : public ClientException {};
- class OomException : public ClientException {};
- class UnknownRequestException : public ClientException {};
- class InvalidRedirectException : public ClientException {};
- class NoChannelHandlerException : public ClientException {};
-
- class PublisherException : public ClientException { };
-
- class SubscriberException : public ClientException { };
- class AlreadySubscribedException : public SubscriberException {};
- class NotSubscribedException : public SubscriberException {};
- class ResubscribeException : public SubscriberException {};
- class NullMessageHandlerException : public SubscriberException {};
- class NullMessageFilterException : public SubscriberException {};
-
- class AlreadyStartDeliveryException : public SubscriberException {};
- class StartingDeliveryException : public SubscriberException {};
-
- class ConfigurationException : public ClientException { };
- class InvalidPortException : public ConfigurationException {};
- class HostResolutionException : public ClientException {};
-
- class InvalidStateException : public ClientException {};
- class ShuttingDownException : public InvalidStateException {};
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/inc/hedwig/publish.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/inc/hedwig/publish.h b/hedwig-client/src/main/cpp/inc/hedwig/publish.h
deleted file mode 100644
index ea08838..0000000
--- a/hedwig-client/src/main/cpp/inc/hedwig/publish.h
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef HEDWIG_PUBLISH_H
-#define HEDWIG_PUBLISH_H
-
-#include <string>
-
-#include <hedwig/exceptions.h>
-#include <hedwig/callback.h>
-#include <hedwig/protocol.h>
-#include <boost/noncopyable.hpp>
-
-namespace Hedwig {
-
- typedef std::tr1::shared_ptr<PublishResponse> PublishResponsePtr;
- typedef Callback<PublishResponsePtr> PublishResponseCallback;
- typedef std::tr1::shared_ptr<PublishResponseCallback> PublishResponseCallbackPtr;
-
- /**
- Interface for publishing to a hedwig instance.
- */
- class Publisher : private boost::noncopyable {
- public:
- /**
- Publish message for topic, and block until we receive a ACK response from the hedwig server.
-
- @param topic Topic to publish to.
- @param message Data to publish for topic.
- */
- virtual PublishResponsePtr publish(const std::string& topic, const std::string& message) = 0;
-
- virtual PublishResponsePtr publish(const std::string& topic, const Message& message) = 0;
-
- /**
- Asynchronously publish message for topic.
-
- @code
- OperationCallbackPtr callback(new MyCallback());
- pub.asyncPublish(callback);
- @endcode
-
- @param topic Topic to publish to.
- @param message Data to publish to topic
- @param callback Callback which will be used to report success or failure. Success is only reported once the server replies with an ACK response to the publication.
- */
- virtual void asyncPublish(const std::string& topic, const std::string& message, const OperationCallbackPtr& callback) = 0;
-
- virtual void asyncPublish(const std::string& topic, const Message& message, const OperationCallbackPtr& callback) = 0;
-
- virtual void asyncPublishWithResponse(const std::string& topic, const Message& messsage,
- const PublishResponseCallbackPtr& callback) = 0;
-
- virtual ~Publisher() {}
- };
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h b/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
deleted file mode 100644
index 4bc718c..0000000
--- a/hedwig-client/src/main/cpp/inc/hedwig/subscribe.h
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef HEDWIG_SUBSCRIBE_H
-#define HEDWIG_SUBSCRIBE_H
-
-#include <string>
-
-#include <hedwig/exceptions.h>
-#include <hedwig/callback.h>
-#include <hedwig/protocol.h>
-#include <boost/noncopyable.hpp>
-
-namespace Hedwig {
-
- /**
- Interface for subscribing to a hedwig instance.
- */
- class Subscriber : private boost::noncopyable {
- public:
- virtual void subscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode) = 0;
- virtual void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscribeRequest::CreateOrAttach mode, const OperationCallbackPtr& callback) = 0;
- virtual void subscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options) = 0;
- virtual void asyncSubscribe(const std::string& topic, const std::string& subscriberId, const SubscriptionOptions& options, const OperationCallbackPtr& callback) = 0;
-
- virtual void unsubscribe(const std::string& topic, const std::string& subscriberId) = 0;
- virtual void asyncUnsubscribe(const std::string& topic, const std::string& subscriberId, const OperationCallbackPtr& callback) = 0;
-
- virtual void consume(const std::string& topic, const std::string& subscriberId, const MessageSeqId& messageSeqId) = 0;
-
- virtual void startDelivery(const std::string& topic, const std::string& subscriberId,
- const MessageHandlerCallbackPtr& callback) = 0;
- virtual void startDeliveryWithFilter(const std::string& topic,
- const std::string& subscriberId,
- const MessageHandlerCallbackPtr& callback,
- const ClientMessageFilterPtr& filter) = 0;
-
- virtual void stopDelivery(const std::string& topic, const std::string& subscriberId) = 0;
-
- virtual bool hasSubscription(const std::string& topic, const std::string& subscriberId) = 0;
- virtual void closeSubscription(const std::string& topic, const std::string& subscriberId) = 0;
- virtual void asyncCloseSubscription(const std::string& topic, const std::string& subscriberId,
- const OperationCallbackPtr& callback) = 0;
-
- //
- // API to register/unregister subscription listeners for receiving
- // events indicating subscription changes for those disable resubscribe
- // subscriptions
- //
- virtual void addSubscriptionListener(SubscriptionListenerPtr& listener) = 0;
- virtual void removeSubscriptionListener(SubscriptionListenerPtr& listener) = 0;
-
- virtual ~Subscriber() {}
- };
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/Makefile.am
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/Makefile.am b/hedwig-client/src/main/cpp/lib/Makefile.am
deleted file mode 100644
index f19a3da..0000000
--- a/hedwig-client/src/main/cpp/lib/Makefile.am
+++ /dev/null
@@ -1,32 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-PROTODEF = ../../../../../hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
-
-lib_LTLIBRARIES = libhedwig01.la
-libhedwig01_la_SOURCES = protocol.cpp channel.cpp client.cpp util.cpp clientimpl.cpp publisherimpl.cpp subscriberimpl.cpp eventdispatcher.cpp data.cpp filterablemessagehandler.cpp simplesubscriberimpl.cpp multiplexsubscriberimpl.cpp
-libhedwig01_la_CPPFLAGS = -I$(top_srcdir)/inc $(DEPS_CFLAGS)
-libhedwig01_la_LIBADD = $(DEPS_LIBS) $(BOOST_CPPFLAGS)
-libhedwig01_la_LDFLAGS = -no-undefined $(BOOST_ASIO_LIB) $(BOOST_LDFLAGS) $(BOOST_THREAD_LIB)
-
-protocol.cpp: $(PROTODEF)
- protoc --cpp_out=. -I`dirname $(PROTODEF)` $(PROTODEF)
- sed "s/PubSubProtocol.pb.h/hedwig\/protocol.h/" PubSubProtocol.pb.cc > protocol.cpp
- rm PubSubProtocol.pb.cc
- mv PubSubProtocol.pb.h $(top_srcdir)/inc/hedwig/protocol.h
-
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/channel.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/channel.cpp b/hedwig-client/src/main/cpp/lib/channel.cpp
deleted file mode 100644
index b980e53..0000000
--- a/hedwig-client/src/main/cpp/lib/channel.cpp
+++ /dev/null
@@ -1,801 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <netinet/tcp.h>
-#include <poll.h>
-#include <iostream>
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
-#include <errno.h>
-#include <vector>
-#include <utility>
-#include <deque>
-#include "channel.h"
-#include "util.h"
-#include "clientimpl.h"
-
-#include <log4cxx/logger.h>
-#include <google/protobuf/io/zero_copy_stream_impl.h>
-
-static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-using namespace Hedwig;
-
-const std::string DEFAULT_SSL_PEM_FILE = "";
-
-AbstractDuplexChannel::AbstractDuplexChannel(IOServicePtr& service,
- const HostAddress& addr,
- const ChannelHandlerPtr& handler)
- : address(addr), handler(handler), service(service->getService()),
- instream(&in_buf), copy_buf(NULL), copy_buf_length(0),
- state(UNINITIALISED), receiving(false), reading(false), sending(false),
- closed(false)
-{}
-
-AbstractDuplexChannel::~AbstractDuplexChannel() {
- free(copy_buf);
- copy_buf = NULL;
- copy_buf_length = 0;
-
- LOG4CXX_INFO(logger, "Destroying DuplexChannel(" << this << ")");
-}
-
-ChannelHandlerPtr AbstractDuplexChannel::getChannelHandler() {
- return handler;
-}
-
-/*static*/ void AbstractDuplexChannel::connectCallbackHandler(
- AbstractDuplexChannelPtr channel,
- OperationCallbackPtr callback,
- const boost::system::error_code& error) {
- channel->doAfterConnect(callback, error);
-}
-
-void AbstractDuplexChannel::connect() {
- connect(OperationCallbackPtr());
-}
-
-void AbstractDuplexChannel::connect(const OperationCallbackPtr& callback) {
- setState(CONNECTING);
- doConnect(callback);
-}
-
-void AbstractDuplexChannel::doAfterConnect(const OperationCallbackPtr& callback,
- const boost::system::error_code& error) {
- if (error) {
- LOG4CXX_ERROR(logger, "Channel " << this << " connect error : " << error.message().c_str());
- channelConnectFailed(ChannelConnectException(), callback);
- return;
- }
-
- // set no delay option
- boost::system::error_code ec;
- setSocketOption(ec);
- if (ec) {
- LOG4CXX_WARN(logger, "Channel " << this << " set up socket error : " << ec.message().c_str());
- channelConnectFailed(ChannelSetupException(), callback);
- return;
- }
-
- boost::asio::ip::tcp::endpoint localEp;
- boost::asio::ip::tcp::endpoint remoteEp;
- localEp = getLocalAddress(ec);
- remoteEp = getRemoteAddress(ec);
-
- if (!ec) {
- LOG4CXX_INFO(logger, "Channel " << this << " connected :"
- << localEp.address().to_string() << ":" << localEp.port() << "=>"
- << remoteEp.address().to_string() << ":" << remoteEp.port());
- // update ip address since if might connect to VIP
- address.updateIP(remoteEp.address().to_v4().to_ulong());
- }
- // the channel is connected
- channelConnected(callback);
-}
-
-void AbstractDuplexChannel::channelConnectFailed(const std::exception& e,
- const OperationCallbackPtr& callback) {
- channelDisconnected(e);
- setState(DEAD);
- if (callback.get()) {
- callback->operationFailed(e);
- }
-}
-
-void AbstractDuplexChannel::channelConnected(const OperationCallbackPtr& callback) {
- // for normal channel, we have done here
- setState(CONNECTED);
- if (callback.get()) {
- callback->operationComplete();
- }
-
- // enable sending & receiving
- startSending();
- startReceiving();
-}
-
-/*static*/ void AbstractDuplexChannel::messageReadCallbackHandler(
- AbstractDuplexChannelPtr channel,
- std::size_t message_size,
- const boost::system::error_code& error,
- std::size_t bytes_transferred) {
- LOG4CXX_DEBUG(logger, "DuplexChannel::messageReadCallbackHandler " << error << ", "
- << bytes_transferred << " channel(" << channel.get() << ")");
-
- if (error) {
- if (!channel->isClosed()) {
- LOG4CXX_INFO(logger, "Invalid read error (" << error << ") bytes_transferred ("
- << bytes_transferred << ") channel(" << channel.get() << ")");
- }
- channel->channelDisconnected(ChannelReadException());
- return;
- }
-
- if (channel->copy_buf_length < message_size) {
- channel->copy_buf_length = message_size;
- channel->copy_buf = (char*)realloc(channel->copy_buf, channel->copy_buf_length);
- if (channel->copy_buf == NULL) {
- LOG4CXX_ERROR(logger, "Error allocating buffer. channel(" << channel.get() << ")");
- // if failed to realloc memory, we should disconnect the channel.
- // then it would enter disconnect logic, which would close channel and release
- // its resources includes the copy_buf memory.
- channel->channelDisconnected(ChannelOutOfMemoryException());
- return;
- }
- }
-
- channel->instream.read(channel->copy_buf, message_size);
- PubSubResponsePtr response(new PubSubResponse());
- bool err = response->ParseFromArray(channel->copy_buf, message_size);
-
- if (!err) {
- LOG4CXX_ERROR(logger, "Error parsing message. channel(" << channel.get() << ")");
- channel->channelDisconnected(ChannelReadException());
- return;
- } else {
- LOG4CXX_DEBUG(logger, "channel(" << channel.get() << ") : " << channel->in_buf.size()
- << " bytes left in buffer");
- }
-
- ChannelHandlerPtr h;
- {
- boost::shared_lock<boost::shared_mutex> lock(channel->destruction_lock);
- if (channel->handler.get()) {
- h = channel->handler;
- }
- }
-
- // channel did stopReceiving, we should not call #messageReceived
- // store this response in outstanding_response variable and did stop receiving
- // when we startReceiving again, we can process this last response.
- {
- boost::lock_guard<boost::mutex> lock(channel->receiving_lock);
- if (!channel->isReceiving()) {
- // queue the response
- channel->outstanding_response = response;
- channel->reading = false;
- return;
- }
- }
-
- // channel is still in receiving status
- if (h.get()) {
- h->messageReceived(channel, response);
- }
-
- AbstractDuplexChannel::readSize(channel);
-}
-
-/*static*/ void AbstractDuplexChannel::sizeReadCallbackHandler(
- AbstractDuplexChannelPtr channel,
- const boost::system::error_code& error,
- std::size_t bytes_transferred) {
- LOG4CXX_DEBUG(logger, "DuplexChannel::sizeReadCallbackHandler " << error << ", "
- << bytes_transferred << " channel(" << channel.get() << ")");
-
- if (error) {
- if (!channel->isClosed()) {
- LOG4CXX_INFO(logger, "Invalid read error (" << error << ") bytes_transferred ("
- << bytes_transferred << ") channel(" << channel.get() << ")");
- }
- channel->channelDisconnected(ChannelReadException());
- return;
- }
-
- if (channel->in_buf.size() < sizeof(uint32_t)) {
- LOG4CXX_ERROR(logger, "Not enough data in stream. Must have been an error reading. "
- << " Closing channel(" << channel.get() << ")");
- channel->channelDisconnected(ChannelReadException());
- return;
- }
-
- uint32_t size;
- std::istream is(&channel->in_buf);
- is.read((char*)&size, sizeof(uint32_t));
- size = ntohl(size);
-
- int toread = size - channel->in_buf.size();
- LOG4CXX_DEBUG(logger, " size of incoming message " << size << ", currently in buffer "
- << channel->in_buf.size() << " channel(" << channel.get() << ")");
- if (toread <= 0) {
- AbstractDuplexChannel::messageReadCallbackHandler(channel, size, error, 0);
- } else {
- channel->readMsgBody(channel->in_buf, toread, size);
- }
-}
-
-/*static*/ void AbstractDuplexChannel::readSize(AbstractDuplexChannelPtr channel) {
- int toread = sizeof(uint32_t) - channel->in_buf.size();
- LOG4CXX_DEBUG(logger, " size of incoming message " << sizeof(uint32_t)
- << ", currently in buffer " << channel->in_buf.size()
- << " channel(" << channel.get() << ")");
-
- if (toread < 0) {
- AbstractDuplexChannel::sizeReadCallbackHandler(channel, boost::system::error_code(), 0);
- } else {
- channel->readMsgSize(channel->in_buf);
- }
-}
-
-void AbstractDuplexChannel::startReceiving() {
- LOG4CXX_DEBUG(logger, "DuplexChannel::startReceiving channel(" << this
- << ") currently receiving = " << receiving);
-
- PubSubResponsePtr response;
- bool inReadingState;
- {
- boost::lock_guard<boost::mutex> lock(receiving_lock);
- // receiving before just return
- if (receiving) {
- return;
- }
- receiving = true;
-
- // if we have last response collected in previous startReceiving
- // we need to process it, but we should process it under receiving_lock
- // otherwise we enter dead lock
- // subscriber#startDelivery(subscriber#queue_lock) =>
- // channel#startReceiving(channel#receiving_lock) =>
- // sbuscriber#messageReceived(subscriber#queue_lock)
- if (outstanding_response.get()) {
- response = outstanding_response;
- outstanding_response = PubSubResponsePtr();
- }
-
- // if channel is in reading status wait data from remote server
- // we don't need to insert another readSize op
- inReadingState = reading;
- if (!reading) {
- reading = true;
- }
- }
-
- // consume message buffered in receiving queue
- // there is at most one message buffered when we
- // stopReceiving between #readSize and #readMsgBody
- if (response.get()) {
- ChannelHandlerPtr h;
- {
- boost::shared_lock<boost::shared_mutex> lock(this->destruction_lock);
- if (this->handler.get()) {
- h = this->handler;
- }
- }
- if (h.get()) {
- h->messageReceived(shared_from_this(), response);
- }
- }
-
- // if channel is not in reading state, #readSize
- if (!inReadingState) {
- AbstractDuplexChannel::readSize(shared_from_this());
- }
-}
-
-bool AbstractDuplexChannel::isReceiving() {
- return receiving;
-}
-
-bool AbstractDuplexChannel::isClosed() {
- return closed;
-}
-
-void AbstractDuplexChannel::stopReceiving() {
- LOG4CXX_DEBUG(logger, "DuplexChannel::stopReceiving channel(" << this << ")");
-
- boost::lock_guard<boost::mutex> lock(receiving_lock);
- receiving = false;
-}
-
-void AbstractDuplexChannel::startSending() {
- {
- boost::shared_lock<boost::shared_mutex> lock(state_lock);
- if (state != CONNECTED) {
- return;
- }
- }
-
- boost::lock_guard<boost::mutex> lock(sending_lock);
- if (sending) {
- return;
- }
- LOG4CXX_DEBUG(logger, "AbstractDuplexChannel::startSending channel(" << this << ")");
-
- WriteRequest w;
- {
- boost::lock_guard<boost::mutex> lock(write_lock);
- if (write_queue.empty()) {
- return;
- }
- w = write_queue.front();
- write_queue.pop_front();
- }
-
- sending = true;
-
- std::ostream os(&out_buf);
- uint32_t size = htonl(w.first->ByteSize());
- os.write((char*)&size, sizeof(uint32_t));
-
- bool err = w.first->SerializeToOstream(&os);
- if (!err) {
- w.second->operationFailed(ChannelWriteException());
- channelDisconnected(ChannelWriteException());
- return;
- }
-
- writeBuffer(out_buf, w.second);
-}
-
-const HostAddress& AbstractDuplexChannel::getHostAddress() const {
- return address;
-}
-
-void AbstractDuplexChannel::channelDisconnected(const std::exception& e) {
- setState(DEAD);
-
- {
- boost::lock_guard<boost::mutex> lock(write_lock);
- while (!write_queue.empty()) {
- WriteRequest w = write_queue.front();
- write_queue.pop_front();
- w.second->operationFailed(e);
- }
- }
-
- ChannelHandlerPtr h;
- {
- boost::shared_lock<boost::shared_mutex> lock(destruction_lock);
- if (handler.get()) {
- h = handler;
- }
- }
- if (h.get()) {
- h->channelDisconnected(shared_from_this(), e);
- }
-}
-
-void AbstractDuplexChannel::close() {
- {
- boost::shared_lock<boost::shared_mutex> statelock(state_lock);
- state = DEAD;
- }
-
- {
- boost::lock_guard<boost::shared_mutex> lock(destruction_lock);
- if (closed) {
- // some one has closed the socket.
- return;
- }
- closed = true;
- handler = ChannelHandlerPtr(); // clear the handler in case it ever referenced the channel*/
- }
-
- LOG4CXX_INFO(logger, "Killing duplex channel (" << this << ")");
-
- // If we are going away, fail all transactions that haven't been completed
- failAllTransactions();
- closeSocket();
-}
-
-/*static*/ void AbstractDuplexChannel::writeCallbackHandler(
- AbstractDuplexChannelPtr channel,
- OperationCallbackPtr callback,
- const boost::system::error_code& error,
- std::size_t bytes_transferred) {
- if (error) {
- if (!channel->isClosed()) {
- LOG4CXX_DEBUG(logger, "AbstractDuplexChannel::writeCallbackHandler " << error << ", "
- << bytes_transferred << " channel(" << channel.get() << ")");
- }
- callback->operationFailed(ChannelWriteException());
- channel->channelDisconnected(ChannelWriteException());
- return;
- }
-
- callback->operationComplete();
-
- channel->out_buf.consume(bytes_transferred);
-
- {
- boost::lock_guard<boost::mutex> lock(channel->sending_lock);
- channel->sending = false;
- }
-
- channel->startSending();
-}
-
-void AbstractDuplexChannel::writeRequest(const PubSubRequestPtr& m,
- const OperationCallbackPtr& callback) {
- {
- boost::shared_lock<boost::shared_mutex> lock(state_lock);
- if (state != CONNECTED && state != CONNECTING) {
- LOG4CXX_ERROR(logger,"Tried to write transaction [" << m->txnid() << "] to a channel ["
- << this << "] which is " << (state == DEAD ? "DEAD" : "UNINITIALISED"));
- callback->operationFailed(UninitialisedChannelException());
- return;
- }
- }
-
- {
- boost::lock_guard<boost::mutex> lock(write_lock);
- WriteRequest w(m, callback);
- write_queue.push_back(w);
- }
-
- startSending();
-}
-
-//
-// Transaction operations
-//
-
-/**
- Store the transaction data for a request.
-*/
-void AbstractDuplexChannel::storeTransaction(const PubSubDataPtr& data) {
- LOG4CXX_DEBUG(logger, "Storing txnid(" << data->getTxnId() << ") for channel(" << this << ")");
-
- boost::lock_guard<boost::mutex> lock(txnid2data_lock);
- txnid2data[data->getTxnId()] = data;
-}
-
-/**
- Give the transaction back to the caller.
-*/
-PubSubDataPtr AbstractDuplexChannel::retrieveTransaction(long txnid) {
- boost::lock_guard<boost::mutex> lock(txnid2data_lock);
-
- PubSubDataPtr data = txnid2data[txnid];
- txnid2data.erase(txnid);
- if (data == NULL) {
- LOG4CXX_ERROR(logger, "Transaction txnid(" << txnid
- << ") doesn't exist in channel (" << this << ")");
- }
-
- return data;
-}
-
-void AbstractDuplexChannel::failAllTransactions() {
- boost::lock_guard<boost::mutex> lock(txnid2data_lock);
- for (TransactionMap::iterator iter = txnid2data.begin(); iter != txnid2data.end(); ++iter) {
- PubSubDataPtr& data = (*iter).second;
- data->getCallback()->operationFailed(ChannelDiedException());
- }
- txnid2data.clear();
-}
-
-// Set state for the channel
-void AbstractDuplexChannel::setState(State s) {
- boost::lock_guard<boost::shared_mutex> lock(state_lock);
- state = s;
-}
-
-//
-// Basic Asio Channel Implementation
-//
-
-AsioDuplexChannel::AsioDuplexChannel(IOServicePtr& service,
- const HostAddress& addr,
- const ChannelHandlerPtr& handler)
- : AbstractDuplexChannel(service, addr, handler) {
- this->socket = boost_socket_ptr(new boost_socket(getService()));
- LOG4CXX_DEBUG(logger, "Creating DuplexChannel(" << this << ")");
-}
-
-AsioDuplexChannel::~AsioDuplexChannel() {
-}
-
-void AsioDuplexChannel::doConnect(const OperationCallbackPtr& callback) {
- boost::system::error_code error = boost::asio::error::host_not_found;
- uint32_t ip2conn = address.ip();
- uint16_t port2conn = address.port();
- boost::asio::ip::tcp::endpoint endp(boost::asio::ip::address_v4(ip2conn), port2conn);
-
- socket->async_connect(endp, boost::bind(&AbstractDuplexChannel::connectCallbackHandler,
- shared_from_this(), callback,
- boost::asio::placeholders::error));
- LOG4CXX_INFO(logger, "Channel (" << this << ") fire connect operation to ip ("
- << ip2conn << ") port (" << port2conn << ")");
-}
-
-void AsioDuplexChannel::setSocketOption(boost::system::error_code& ec) {
- boost::asio::ip::tcp::no_delay option(true);
- socket->set_option(option, ec);
-}
-
-boost::asio::ip::tcp::endpoint AsioDuplexChannel::getLocalAddress(
- boost::system::error_code& ec) {
- return socket->local_endpoint(ec);
-}
-
-boost::asio::ip::tcp::endpoint AsioDuplexChannel::getRemoteAddress(
- boost::system::error_code& ec) {
- return socket->remote_endpoint(ec);
-}
-
-void AsioDuplexChannel::writeBuffer(boost::asio::streambuf& buffer,
- const OperationCallbackPtr& callback) {
- boost::asio::async_write(*socket, buffer,
- boost::bind(&AbstractDuplexChannel::writeCallbackHandler,
- shared_from_this(), callback,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred)
- );
-}
-
-void AsioDuplexChannel::readMsgSize(boost::asio::streambuf& buffer) {
- boost::asio::async_read(*socket, buffer, boost::asio::transfer_at_least(sizeof(uint32_t)),
- boost::bind(&AbstractDuplexChannel::sizeReadCallbackHandler,
- shared_from_this(),
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
-}
-
-void AsioDuplexChannel::readMsgBody(boost::asio::streambuf& buffer,
- int toread, uint32_t msgSize) {
- boost::asio::async_read(*socket, buffer, boost::asio::transfer_at_least(toread),
- boost::bind(&AbstractDuplexChannel::messageReadCallbackHandler,
- shared_from_this(), msgSize,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
-}
-
-void AsioDuplexChannel::closeSocket() {
- boost::system::error_code ec;
-
- socket->cancel(ec);
- if (ec) {
- LOG4CXX_WARN(logger, "Channel " << this << " canceling io error : " << ec.message().c_str());
- }
-
- socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
- if (ec) {
- LOG4CXX_WARN(logger, "Channel " << this << " shutdown error : " << ec.message().c_str());
- }
-
- socket->close(ec);
- if (ec) {
- LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
- }
- LOG4CXX_DEBUG(logger, "Closed socket for channel " << this << ".");
-}
-
-// SSL Context Factory
-
-SSLContextFactory::SSLContextFactory(const Configuration& conf)
- : conf(conf),
- sslPemFile(conf.get(Configuration::SSL_PEM_FILE,
- DEFAULT_SSL_PEM_FILE)) {
-}
-
-SSLContextFactory::~SSLContextFactory() {}
-
-boost_ssl_context_ptr SSLContextFactory::createSSLContext(boost::asio::io_service& service) {
- boost_ssl_context_ptr sslCtx(new boost_ssl_context(service,
- boost::asio::ssl::context::sslv23_client));
- sslCtx->set_verify_mode(boost::asio::ssl::context::verify_none);
- if (!sslPemFile.empty()) {
- boost::system::error_code err;
- sslCtx->load_verify_file(sslPemFile, err);
-
- if (err) {
- LOG4CXX_ERROR(logger, "Failed to load verify ssl pem file : "
- << sslPemFile);
- throw InvalidSSLPermFileException();
- }
- }
- return sslCtx;
-}
-
-//
-// SSL Channl Implementation
-//
-
-#ifndef __APPLE__
-AsioSSLDuplexChannel::AsioSSLDuplexChannel(IOServicePtr& service,
- const boost_ssl_context_ptr& sslCtx,
- const HostAddress& addr,
- const ChannelHandlerPtr& handler)
- : AbstractDuplexChannel(service, addr, handler), ssl_ctx(sslCtx),
- sslclosed(false) {
-#else
-AsioSSLDuplexChannel::AsioSSLDuplexChannel(IOServicePtr& service,
- const boost_ssl_context_ptr& sslCtx,
- const HostAddress& addr,
- const ChannelHandlerPtr& handler)
- : AbstractDuplexChannel(service, addr, handler), ssl_ctx(sslCtx) {
-#endif
- ssl_socket = boost_ssl_socket_ptr(new boost_ssl_socket(getService(), *ssl_ctx));
- LOG4CXX_DEBUG(logger, "Created SSL DuplexChannel(" << this << ")");
-}
-
-AsioSSLDuplexChannel::~AsioSSLDuplexChannel() {
-}
-
-void AsioSSLDuplexChannel::doConnect(const OperationCallbackPtr& callback) {
- boost::system::error_code error = boost::asio::error::host_not_found;
- uint32_t ip2conn = address.ip();
- uint16_t port2conn = address.sslPort();
- boost::asio::ip::tcp::endpoint endp(boost::asio::ip::address_v4(ip2conn), port2conn);
-
- ssl_socket->lowest_layer().async_connect(endp,
- boost::bind(&AbstractDuplexChannel::connectCallbackHandler,
- shared_from_this(), callback,
- boost::asio::placeholders::error));
- LOG4CXX_INFO(logger, "SSL Channel (" << this << ") fire connect operation to ip ("
- << ip2conn << ") port (" << port2conn << ")");
-}
-
-void AsioSSLDuplexChannel::setSocketOption(boost::system::error_code& ec) {
- boost::asio::ip::tcp::no_delay option(true);
- ssl_socket->lowest_layer().set_option(option, ec);
-}
-
-boost::asio::ip::tcp::endpoint AsioSSLDuplexChannel::getLocalAddress(
- boost::system::error_code& ec) {
- return ssl_socket->lowest_layer().local_endpoint(ec);
-}
-
-boost::asio::ip::tcp::endpoint AsioSSLDuplexChannel::getRemoteAddress(
- boost::system::error_code& ec) {
- return ssl_socket->lowest_layer().remote_endpoint(ec);
-}
-
-void AsioSSLDuplexChannel::channelConnected(const OperationCallbackPtr& callback) {
- // for SSL channel, we had to do SSL hand shake
- startHandShake(callback);
- LOG4CXX_INFO(logger, "SSL Channel " << this << " fire hand shake operation");
-}
-
-void AsioSSLDuplexChannel::sslChannelConnected(const OperationCallbackPtr& callback) {
- LOG4CXX_INFO(logger, "SSL Channel " << this << " hand shake finish!!");
- AbstractDuplexChannel::channelConnected(callback);
-}
-
-void AsioSSLDuplexChannel::startHandShake(const OperationCallbackPtr& callback) {
- ssl_socket->async_handshake(boost::asio::ssl::stream_base::client,
- boost::bind(&AsioSSLDuplexChannel::handleHandshake,
- boost::dynamic_pointer_cast<AsioSSLDuplexChannel>(shared_from_this()),
- callback, boost::asio::placeholders::error));
-}
-
-void AsioSSLDuplexChannel::handleHandshake(AsioSSLDuplexChannelPtr channel,
- OperationCallbackPtr callback,
- const boost::system::error_code& error) {
- if (error) {
- LOG4CXX_ERROR(logger, "SSL Channel " << channel.get() << " hand shake error : "
- << error.message().c_str());
- channel->channelConnectFailed(ChannelConnectException(), callback);
- return;
- }
- channel->sslChannelConnected(callback);
-}
-
-void AsioSSLDuplexChannel::writeBuffer(boost::asio::streambuf& buffer,
- const OperationCallbackPtr& callback) {
- boost::asio::async_write(*ssl_socket, buffer,
- boost::bind(&AbstractDuplexChannel::writeCallbackHandler,
- shared_from_this(), callback,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred)
- );
-}
-
-void AsioSSLDuplexChannel::readMsgSize(boost::asio::streambuf& buffer) {
- boost::asio::async_read(*ssl_socket, buffer, boost::asio::transfer_at_least(sizeof(uint32_t)),
- boost::bind(&AbstractDuplexChannel::sizeReadCallbackHandler,
- shared_from_this(),
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
-}
-
-void AsioSSLDuplexChannel::readMsgBody(boost::asio::streambuf& buffer,
- int toread, uint32_t msgSize) {
- boost::asio::async_read(*ssl_socket, buffer, boost::asio::transfer_at_least(toread),
- boost::bind(&AbstractDuplexChannel::messageReadCallbackHandler,
- shared_from_this(), msgSize,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
-}
-
-#ifndef __APPLE__
-// boost asio doesn't provide time out mechanism to shutdown ssl
-void AsioSSLDuplexChannel::sslShutdown() {
- ssl_socket->async_shutdown(boost::bind(&AsioSSLDuplexChannel::handleSSLShutdown,
- boost::shared_dynamic_cast<AsioSSLDuplexChannel>(shared_from_this()),
- boost::asio::placeholders::error));
-}
-
-void AsioSSLDuplexChannel::handleSSLShutdown(const boost::system::error_code& error) {
- if (error) {
- LOG4CXX_ERROR(logger, "SSL Channel " << this << " shutdown error : "
- << error.message().c_str());
- }
- {
- boost::lock_guard<boost::mutex> lock(sslclosed_lock);
- sslclosed = true;
- }
- sslclosed_cond.notify_all();
-}
-#endif
-
-void AsioSSLDuplexChannel::closeSocket() {
-#ifndef __APPLE__
- // Shutdown ssl
- sslShutdown();
- // time wait
- {
- boost::mutex::scoped_lock lock(sslclosed_lock);
- if (!sslclosed) {
- sslclosed_cond.timed_wait(lock, boost::posix_time::milliseconds(1000));
- }
- }
-#endif
- closeLowestLayer();
-}
-
-void AsioSSLDuplexChannel::closeLowestLayer() {
- boost::system::error_code ec;
-
- ssl_socket->lowest_layer().cancel(ec);
- if (ec) {
- LOG4CXX_WARN(logger, "Channel " << this << " canceling io error : " << ec.message().c_str());
- }
-
- ssl_socket->lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
- if (ec) {
- LOG4CXX_WARN(logger, "Channel " << this << " shutdown error : " << ec.message().c_str());
- }
-
- ssl_socket->lowest_layer().close(ec);
- if (ec) {
- LOG4CXX_WARN(logger, "Channel " << this << " close error : " << ec.message().c_str());
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/channel.h
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/channel.h b/hedwig-client/src/main/cpp/lib/channel.h
deleted file mode 100644
index c9ef289..0000000
--- a/hedwig-client/src/main/cpp/lib/channel.h
+++ /dev/null
@@ -1,438 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef HEDWIG_CHANNEL_H
-#define HEDWIG_CHANNEL_H
-
-#include <hedwig/protocol.h>
-#include <hedwig/callback.h>
-#include <hedwig/client.h>
-#include "util.h"
-#include "data.h"
-#include "eventdispatcher.h"
-
-#ifdef USE_BOOST_TR1
-#include <boost/tr1/memory.hpp>
-#include <boost/tr1/unordered_map.hpp>
-#else
-#include <tr1/memory>
-#include <tr1/unordered_map>
-#endif
-
-#include <google/protobuf/io/zero_copy_stream_impl.h>
-
-#include <boost/shared_ptr.hpp>
-#include <boost/enable_shared_from_this.hpp>
-
-#include <boost/asio.hpp>
-#include <boost/asio/ip/tcp.hpp>
-#include <boost/asio/ssl.hpp>
-#include <boost/function.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/shared_mutex.hpp>
-
-namespace Hedwig {
- class ChannelException : public std::exception { };
- class UninitialisedChannelException : public ChannelException {};
-
- class ChannelConnectException : public ChannelException {};
- class CannotCreateSocketException : public ChannelConnectException {};
- class ChannelSetupException : public ChannelConnectException {};
- class ChannelNotConnectedException : public ChannelConnectException {};
-
- class ChannelDiedException : public ChannelException {};
-
- class ChannelWriteException : public ChannelException {};
- class ChannelReadException : public ChannelException {};
- class ChannelThreadException : public ChannelException {};
- class ChannelOutOfMemoryException : public ChannelException {};
-
- class InvalidSSLPermFileException : public std::exception {};
-
- class DuplexChannel;
- typedef boost::shared_ptr<DuplexChannel> DuplexChannelPtr;
- typedef boost::asio::ip::tcp::socket boost_socket;
- typedef boost::shared_ptr<boost_socket> boost_socket_ptr;
- typedef boost::asio::ssl::stream<boost_socket> boost_ssl_socket;
- typedef boost::shared_ptr<boost_ssl_socket> boost_ssl_socket_ptr;
-
- class ChannelHandler {
- public:
- virtual void messageReceived(const DuplexChannelPtr& channel, const PubSubResponsePtr& m) = 0;
- virtual void channelConnected(const DuplexChannelPtr& channel) = 0;
-
- virtual void channelDisconnected(const DuplexChannelPtr& channel, const std::exception& e) = 0;
- virtual void exceptionOccurred(const DuplexChannelPtr& channel, const std::exception& e) = 0;
-
- virtual ~ChannelHandler() {}
- };
-
- typedef boost::shared_ptr<ChannelHandler> ChannelHandlerPtr;
-
- // A channel interface to send requests
- class DuplexChannel {
- public:
- virtual ~DuplexChannel() {}
-
- // Return the channel handler bound with a channel
- virtual ChannelHandlerPtr getChannelHandler() = 0;
-
- // Issues a connect request to the target host
- // User could writeRequest after issued connect request, those requests should
- // be buffered and written until the channel is connected.
- virtual void connect() = 0;
-
- // Issues a connect request to the target host
- // User could writeRequest after issued connect request, those requests should
- // be buffered and written until the channel is connected.
- // The provided callback would be triggered after connected.
- virtual void connect(const OperationCallbackPtr& callback) = 0;
-
- // Write the request to underlying channel
- // If the channel is not established, all write requests would be buffered
- // until channel is connected.
- virtual void writeRequest(const PubSubRequestPtr& m,
- const OperationCallbackPtr& callback) = 0;
-
- // Returns the remote address where this channel is connected to.
- virtual const HostAddress& getHostAddress() const = 0;
-
- // Resumes the read operation of this channel asynchronously
- virtual void startReceiving() = 0;
-
- // Suspends the read operation of this channel asynchronously
- virtual void stopReceiving() = 0;
-
- // Returns if and only if the channel will read a message
- virtual bool isReceiving() = 0;
-
- //
- // Transaction operations
- //
-
- // Store a pub/sub request
- virtual void storeTransaction(const PubSubDataPtr& data) = 0;
-
- // Remove a pub/sub request
- virtual PubSubDataPtr retrieveTransaction(long txnid) = 0;
-
- // Fail all transactions
- virtual void failAllTransactions() = 0;
-
- // Handle the case that the channel is disconnected due issues found
- // when reading or writing
- virtual void channelDisconnected(const std::exception& e) = 0;
-
- // Close the channel to release the resources
- // Once a channel is closed, it can not be open again. Calling this
- // method on a closed channel has no efffect.
- virtual void close() = 0;
- };
-
- typedef boost::asio::ssl::context boost_ssl_context;
- typedef boost::shared_ptr<boost_ssl_context> boost_ssl_context_ptr;
-
- class SSLContextFactory {
- public:
- SSLContextFactory(const Configuration& conf);
- ~SSLContextFactory();
-
- boost_ssl_context_ptr createSSLContext(boost::asio::io_service& service);
- private:
- const Configuration& conf;
- std::string sslPemFile;
- };
-
- typedef boost::shared_ptr<SSLContextFactory> SSLContextFactoryPtr;
-
- class AbstractDuplexChannel;
- typedef boost::shared_ptr<AbstractDuplexChannel> AbstractDuplexChannelPtr;
-
- class AbstractDuplexChannel : public DuplexChannel,
- public boost::enable_shared_from_this<AbstractDuplexChannel> {
- public:
- AbstractDuplexChannel(IOServicePtr& service,
- const HostAddress& addr,
- const ChannelHandlerPtr& handler);
- virtual ~AbstractDuplexChannel();
-
- virtual ChannelHandlerPtr getChannelHandler();
-
- //
- // Connect Operation
- //
-
- // Asio Connect Callback Handler
- static void connectCallbackHandler(AbstractDuplexChannelPtr channel,
- OperationCallbackPtr callback,
- const boost::system::error_code& error);
- virtual void connect();
- virtual void connect(const OperationCallbackPtr& callback);
-
- //
- // Write Operation
- //
-
- // Asio Write Callback Handler
- static void writeCallbackHandler(AbstractDuplexChannelPtr channel,
- OperationCallbackPtr callback,
- const boost::system::error_code& error,
- std::size_t bytes_transferred);
- // Write request
- virtual void writeRequest(const PubSubRequestPtr& m,
- const OperationCallbackPtr& callback);
-
- // get the target host
- virtual const HostAddress& getHostAddress() const;
-
- static void sizeReadCallbackHandler(AbstractDuplexChannelPtr channel,
- const boost::system::error_code& error,
- std::size_t bytes_transferred);
- static void messageReadCallbackHandler(AbstractDuplexChannelPtr channel,
- std::size_t messagesize,
- const boost::system::error_code& error,
- std::size_t bytes_transferred);
- static void readSize(AbstractDuplexChannelPtr channel);
-
- // start receiving responses from underlying channel
- virtual void startReceiving();
- // is the underlying channel in receiving state
- virtual bool isReceiving();
- // stop receiving responses from underlying channel
- virtual void stopReceiving();
-
- // Store a pub/sub request
- virtual void storeTransaction(const PubSubDataPtr& data);
-
- // Remove a pub/sub request
- virtual PubSubDataPtr retrieveTransaction(long txnid);
-
- // Fail all transactions
- virtual void failAllTransactions();
-
- // channel is disconnected for a specified exception
- virtual void channelDisconnected(const std::exception& e);
-
- // close the channel
- virtual void close();
-
- inline boost::asio::io_service & getService() const {
- return service;
- }
-
- protected:
- // execute the connect operation
- virtual void doConnect(const OperationCallbackPtr& callback) = 0;
-
- virtual void doAfterConnect(const OperationCallbackPtr& callback,
- const boost::system::error_code& error);
-
- // Execute the action after channel connect
- // It would be executed in asio connect callback handler
- virtual void setSocketOption(boost::system::error_code& ec) = 0;
- virtual boost::asio::ip::tcp::endpoint
- getRemoteAddress(boost::system::error_code& ec) = 0;
- virtual boost::asio::ip::tcp::endpoint
- getLocalAddress(boost::system::error_code& ec) = 0;
-
- // Channel failed to connect
- virtual void channelConnectFailed(const std::exception& e,
- const OperationCallbackPtr& callback);
- // Channel connected
- virtual void channelConnected(const OperationCallbackPtr& callback);
-
- // Start sending buffered requests to target host
- void startSending();
-
- // Write a buffer to underlying socket
- virtual void writeBuffer(boost::asio::streambuf& buffer,
- const OperationCallbackPtr& callback) = 0;
-
- // Read a message from underlying socket
- virtual void readMsgSize(boost::asio::streambuf& buffer) = 0;
- virtual void readMsgBody(boost::asio::streambuf& buffer,
- int toread, uint32_t msgSize) = 0;
-
- // is the channel under closing
- bool isClosed();
-
- // close the underlying socket to release resource
- virtual void closeSocket() = 0;
-
- enum State { UNINITIALISED, CONNECTING, CONNECTED, DEAD };
- void setState(State s);
-
- // Address
- HostAddress address;
- private:
- ChannelHandlerPtr handler;
-
- boost::asio::io_service &service;
-
- // buffers for input stream
- boost::asio::streambuf in_buf;
- std::istream instream;
-
- // only exists because protobufs can't play nice with streams
- // (if there's more than message len in it, it tries to read all)
- char* copy_buf;
- std::size_t copy_buf_length;
-
- // buffers for output stream
- boost::asio::streambuf out_buf;
- // write requests queue
- typedef std::pair<PubSubRequestPtr, OperationCallbackPtr> WriteRequest;
- boost::mutex write_lock;
- std::deque<WriteRequest> write_queue;
-
- // channel state
- State state;
- boost::shared_mutex state_lock;
-
- // reading state
- bool receiving;
- bool reading;
- PubSubResponsePtr outstanding_response;
- boost::mutex receiving_lock;
-
- // sending state
- bool sending;
- boost::mutex sending_lock;
-
- // flag indicates the channel is closed
- // some callback might return when closing
- bool closed;
-
- // transactions
- typedef std::tr1::unordered_map<long, PubSubDataPtr> TransactionMap;
-
- TransactionMap txnid2data;
- boost::mutex txnid2data_lock;
- boost::shared_mutex destruction_lock;
- };
-
- class AsioDuplexChannel : public AbstractDuplexChannel {
- public:
- AsioDuplexChannel(IOServicePtr& service,
- const HostAddress& addr,
- const ChannelHandlerPtr& handler);
- virtual ~AsioDuplexChannel();
- protected:
- // execute the connect operation
- virtual void doConnect(const OperationCallbackPtr& callback);
-
- // Execute the action after channel connect
- // It would be executed in asio connect callback handler
- virtual void setSocketOption(boost::system::error_code& ec);
- virtual boost::asio::ip::tcp::endpoint
- getRemoteAddress(boost::system::error_code& ec);
- virtual boost::asio::ip::tcp::endpoint
- getLocalAddress(boost::system::error_code& ec);
-
- // Write a buffer to underlying socket
- virtual void writeBuffer(boost::asio::streambuf& buffer,
- const OperationCallbackPtr& callback);
-
- // Read a message from underlying socket
- virtual void readMsgSize(boost::asio::streambuf& buffer);
- virtual void readMsgBody(boost::asio::streambuf& buffer,
- int toread, uint32_t msgSize);
-
- // close the underlying socket to release resource
- virtual void closeSocket();
- private:
- // underlying socket
- boost_socket_ptr socket;
- };
-
- typedef boost::shared_ptr<AsioDuplexChannel> AsioDuplexChannelPtr;
-
- class AsioSSLDuplexChannel;
- typedef boost::shared_ptr<AsioSSLDuplexChannel> AsioSSLDuplexChannelPtr;
-
- class AsioSSLDuplexChannel : public AbstractDuplexChannel {
- public:
- AsioSSLDuplexChannel(IOServicePtr& service,
- const boost_ssl_context_ptr& sslCtx,
- const HostAddress& addr,
- const ChannelHandlerPtr& handler);
- virtual ~AsioSSLDuplexChannel();
- protected:
- // execute the connect operation
- virtual void doConnect(const OperationCallbackPtr& callback);
- // Execute the action after channel connect
- // It would be executed in asio connect callback handler
- virtual void setSocketOption(boost::system::error_code& ec);
- virtual boost::asio::ip::tcp::endpoint
- getRemoteAddress(boost::system::error_code& ec);
- virtual boost::asio::ip::tcp::endpoint
- getLocalAddress(boost::system::error_code& ec);
-
- virtual void channelConnected(const OperationCallbackPtr& callback);
-
- // Start SSL Hand Shake after the channel is connected
- void startHandShake(const OperationCallbackPtr& callback);
- // Asio Callback After Hand Shake
- static void handleHandshake(AsioSSLDuplexChannelPtr channel,
- OperationCallbackPtr callback,
- const boost::system::error_code& error);
-
- void sslChannelConnected(const OperationCallbackPtr& callback);
-
- // Write a buffer to underlying socket
- virtual void writeBuffer(boost::asio::streambuf& buffer,
- const OperationCallbackPtr& callback);
-
- // Read a message from underlying socket
- virtual void readMsgSize(boost::asio::streambuf& buffer);
- virtual void readMsgBody(boost::asio::streambuf& buffer,
- int toread, uint32_t msgSize);
-
- // close the underlying socket to release resource
- virtual void closeSocket();
-
- private:
-#ifndef __APPLE__
- // Shutdown ssl
- void sslShutdown();
- // Handle ssl shutdown
- void handleSSLShutdown(const boost::system::error_code& error);
-#endif
- // Close lowest layer
- void closeLowestLayer();
-
- // underlying ssl socket
- boost_ssl_socket_ptr ssl_socket;
- // ssl context
- boost_ssl_context_ptr ssl_ctx;
-
-#ifndef __APPLE__
- // Flag indicated ssl is closed.
- bool sslclosed;
- boost::mutex sslclosed_lock;
- boost::condition_variable sslclosed_cond;
-#endif
- };
-
-
- struct DuplexChannelPtrHash : public std::unary_function<DuplexChannelPtr, size_t> {
- size_t operator()(const Hedwig::DuplexChannelPtr& channel) const {
- return reinterpret_cast<size_t>(channel.get());
- }
- };
-};
-#endif
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client/src/main/cpp/lib/client.cpp
----------------------------------------------------------------------
diff --git a/hedwig-client/src/main/cpp/lib/client.cpp b/hedwig-client/src/main/cpp/lib/client.cpp
deleted file mode 100644
index e98c452..0000000
--- a/hedwig-client/src/main/cpp/lib/client.cpp
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifdef HAVE_CONFIG_H
-#include <config.h>
-#endif
-
-#include <hedwig/client.h>
-#include <memory>
-
-#include "clientimpl.h"
-#include <log4cxx/logger.h>
-
-static log4cxx::LoggerPtr logger(log4cxx::Logger::getLogger("hedwig."__FILE__));
-
-using namespace Hedwig;
-
-const std::string Configuration::DEFAULT_SERVER = "hedwig.cpp.default_server";
-const std::string Configuration::MESSAGE_CONSUME_RETRY_WAIT_TIME = "hedwig.cpp.message_consume_retry_wait_time";
-const std::string Configuration::SUBSCRIBER_CONSUME_RETRY_WAIT_TIME = "hedwig.cpp.subscriber_consume_retry_wait_time";
-const std::string Configuration::MAX_MESSAGE_QUEUE_SIZE = "hedwig.cpp.max_msgqueue_size";
-const std::string Configuration::RECONNECT_SUBSCRIBE_RETRY_WAIT_TIME = "hedwig.cpp.reconnect_subscribe_retry_wait_time";
-const std::string Configuration::SYNC_REQUEST_TIMEOUT = "hedwig.cpp.sync_request_timeout";
-const std::string Configuration::SUBSCRIBER_AUTOCONSUME = "hedwig.cpp.subscriber_autoconsume";
-const std::string Configuration::NUM_DISPATCH_THREADS = "hedwig.cpp.num_dispatch_threads";
-const std::string Configuration::SUBSCRIPTION_MESSAGE_BOUND = "hedwig.cpp.subscription_message_bound";
-const std::string Configuration::SSL_ENABLED = "hedwig.cpp.ssl_enabled";
-const std::string Configuration::SSL_PEM_FILE = "hedwig.cpp.ssl_pem";
-const std::string Configuration::SUBSCRIPTION_CHANNEL_SHARING_ENABLED = "hedwig.cpp.subscription_channel_sharing_enabled";
-
-Client::Client(const Configuration& conf) {
- LOG4CXX_DEBUG(logger, "Client::Client (" << this << ")");
-
- clientimpl = ClientImpl::Create( conf );
-}
-
-Subscriber& Client::getSubscriber() {
- return clientimpl->getSubscriber();
-}
-
-Publisher& Client::getPublisher() {
- return clientimpl->getPublisher();
-}
-
-Client::~Client() {
- LOG4CXX_DEBUG(logger, "Client::~Client (" << this << ")");
-
- clientimpl->Destroy();
-}
-
-