You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/09/04 05:04:48 UTC
[02/14] incubator-rocketmq-externals git commit: upload rocketmq-cpp
code
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/sequencer.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/sequencer.h b/rocketmq-cpp/src/thread/disruptor/sequencer.h
new file mode 100755
index 0000000..98d617f
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/sequencer.h
@@ -0,0 +1,190 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the disruptor-- nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_SEQUENCER_H_ // NOLINT
+#define DISRUPTOR_SEQUENCER_H_ // NOLINT
+
+#include <vector>
+
+#include "batch_descriptor.h"
+#include "claim_strategy.h"
+#include "interface.h"
+#include "sequence_barrier.h"
+#include "wait_strategy.h"
+
+namespace rocketmq {
+
+// Coordinator for claiming sequences for access to a data structures while
+// tracking dependent {@link Sequence}s
+class Sequencer: public boost::noncopyable {
+ public:
+ // Construct a Sequencer with the selected strategies.
+ //
+ // @param buffer_size over which sequences are valid.
+ // @param claim_strategy_option for those claiming sequences.
+ // @param wait_strategy_option for those waiting on sequences.
+ Sequencer(int buffer_size,
+ ClaimStrategyOption claim_strategy_option,
+ WaitStrategyOption wait_strategy_option) :
+ buffer_size_(buffer_size),
+ claim_strategy_(CreateClaimStrategy(claim_strategy_option,
+ buffer_size_)),
+ wait_strategy_(CreateWaitStrategy(wait_strategy_option)) { }
+
+ ~Sequencer() {
+ delete claim_strategy_;
+ delete wait_strategy_;
+ }
+
+ // Set the sequences that will gate publishers to prevent the buffer
+ // wrapping.
+ //
+ // @param sequences to be gated on.
+ void set_gating_sequences(
+ const std::vector<Sequence*>& sequences) {
+ gating_sequences_ = sequences;
+ }
+
+ // Create a {@link SequenceBarrier} that gates on the cursor and a list of
+ // {@link Sequence}s.
+ //
+ // @param sequences_to_track this barrier will track.
+ // @return the barrier gated as required.
+ ProcessingSequenceBarrier* NewBarrier(
+ const std::vector<Sequence*>& sequences_to_track) {
+ return new ProcessingSequenceBarrier(wait_strategy_, &cursor_,
+ sequences_to_track);
+ }
+
+ // Create a new {@link BatchDescriptor} that is the minimum of the
+ // requested size and the buffer_size.
+ //
+ // @param size for the new batch.
+ // @return the new {@link BatchDescriptor}.
+ BatchDescriptor* NewBatchDescriptor(const int& size) {
+ return new BatchDescriptor(size<buffer_size_?size:buffer_size_);
+ }
+
+ // The capacity of the data structure to hold entries.
+ //
+ // @return capacity of the data structure.
+ int buffer_size() { return buffer_size_; }
+
+
+ // Get the value of the cursor indicating the published sequence.
+ //
+ // @return value of the cursor for events that have been published.
+ int64_t GetCursor() { return cursor_.sequence(); }
+
+ // Has the buffer capacity left to allocate another sequence. This is a
+ // concurrent method so the response should only be taken as an indication
+ // of available capacity.
+ //
+ // @return true if the buffer has the capacity to allocated another event.
+ bool HasAvalaibleCapacity() {
+ return claim_strategy_->HasAvalaibleCapacity(gating_sequences_);
+ }
+
+ // Claim the next event in sequence for publishing to the {@link RingBuffer}.
+ //
+ // @return the claimed sequence.
+ int64_t Next() {
+ return claim_strategy_->IncrementAndGet(gating_sequences_);
+ }
+
+ // Claim the next batch of sequence numbers for publishing.
+ //
+ // @param batch_descriptor to be updated for the batch range.
+ // @return the updated batch_descriptor.
+ BatchDescriptor* Next(BatchDescriptor* batch_descriptor) {
+ int64_t sequence = claim_strategy_->IncrementAndGet(batch_descriptor->size(), gating_sequences_);
+ batch_descriptor->set_end(sequence);
+ return batch_descriptor;
+ }
+
+ // Claim a specific sequence when only one publisher is involved.
+ //
+ // @param sequence to be claimed.
+ // @return sequence just claime.
+ int64_t Claim(const int64_t& sequence) {
+ claim_strategy_->SetSequence(sequence, gating_sequences_);
+ return sequence;
+ }
+
+ // Publish an event and make it visible to {@link EventProcessor}s.
+ //
+ // @param sequence to be published.
+ void Publish(const int64_t& sequence) {
+ Publish(sequence, 1);
+ }
+
+ // Publish the batch of events in sequence.
+ //
+ // @param sequence to be published.
+ void Publish(const BatchDescriptor& batch_descriptor) {
+ Publish(batch_descriptor.end(), batch_descriptor.size());
+ }
+
+ // Force the publication of a cursor sequence.
+ //
+ // Only use this method when forcing a sequence and you are sure only one
+ // publisher exists. This will cause the cursor to advance to this
+ // sequence.
+ //
+ // @param sequence to which is to be forced for publication.
+ void ForcePublish(const int64_t& sequence) {
+ cursor_.set_sequence(sequence);
+ wait_strategy_->SignalAllWhenBlocking();
+ }
+
+ // TODO(fsaintjacques): This was added to overcome
+ // NoOpEventProcessor::GetSequence(), this is not a clean solution.
+ Sequence* GetSequencePtr() {
+ return &cursor_;
+ }
+
+ private:
+ // Helpers
+ void Publish(const int64_t& sequence, const int64_t& batch_size) {
+ //LOG_DEBUG("publish sequence:%d", sequence);
+ claim_strategy_->SerialisePublishing(sequence, cursor_, batch_size);
+ cursor_.set_sequence(sequence);
+ wait_strategy_->SignalAllWhenBlocking();
+ }
+
+ // Members
+ const int buffer_size_;
+
+ PaddedSequence cursor_;
+ std::vector<Sequence*> gating_sequences_;
+
+ ClaimStrategyInterface* claim_strategy_;
+ WaitStrategyInterface* wait_strategy_;
+
+};
+
+}; // namespace rocketmq
+
+#endif // DISRUPTOR_RING_BUFFER_H_ NOLINT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/utils.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/utils.h b/rocketmq-cpp/src/thread/disruptor/utils.h
new file mode 100755
index 0000000..0730093
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/utils.h
@@ -0,0 +1,35 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the disruptor-- nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_UTILS_H_ // NOLINT
+#define DISRUPTOR_UTILS_H_ // NOLINT
+
+// From Google C++ Standard, modified to use C++11 deleted functions.
+// A macro to disallow the copy constructor and operator= functions.
+#define DISALLOW_COPY_AND_ASSIGN(TypeName) \
+ TypeName(const TypeName&) delete \
+ void operator=(const TypeName&) delete;
+
+#endif // DISRUPTOR_UTILS_H_ NOLINT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/wait_strategy.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/wait_strategy.h b/rocketmq-cpp/src/thread/disruptor/wait_strategy.h
new file mode 100755
index 0000000..fb2e58a
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/wait_strategy.h
@@ -0,0 +1,377 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+// * Redistributions of source code must retain the above copyright
+// notice, this list of conditions and the following disclaimer.
+// * Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// * Neither the name of the disruptor-- nor the
+// names of its contributors may be used to endorse or promote products
+// derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_WAITSTRATEGY_H_ // NOLINT
+#define DISRUPTOR_WAITSTRATEGY_H_ // NOLINT
+
+#include <sys/time.h>
+
+#include <boost/chrono.hpp>
+#include <boost/thread.hpp>
+#include <vector>
+
+#include "exceptions.h"
+#include "interface.h"
+#include "sequence.h"
+
+namespace rocketmq {
+
+// Strategy options which are available to those waiting on a
+// {@link RingBuffer}
+enum WaitStrategyOption {
+ // This strategy uses a condition variable inside a lock to block the
+ // event procesor which saves CPU resource at the expense of lock
+ // contention.
+ kBlockingStrategy,
+ // This strategy uses a progressive back off strategy by first spinning,
+ // then yielding, then sleeping for 1ms period. This is a good strategy
+ // for burst traffic then quiet periods when latency is not critical.
+ kSleepingStrategy,
+ // This strategy calls Thread.yield() in a loop as a waiting strategy
+ // which reduces contention at the expense of CPU resource.
+ kYieldingStrategy,
+ // This strategy call spins in a loop as a waiting strategy which is
+ // lowest and most consistent latency but ties up a CPU.
+ kBusySpinStrategy
+};
+
+// Blocking strategy that uses a lock and condition variable for
+// {@link Consumer}s waiting on a barrier.
+// This strategy should be used when performance and low-latency are not as
+// important as CPU resource.
+class BlockingStrategy : public WaitStrategyInterface {
+ public:
+ BlockingStrategy() {}
+
+ virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+ const Sequence& cursor,
+ const SequenceBarrierInterface& barrier,
+ const int64_t& sequence) {
+ int64_t available_sequence = 0;
+ // We need to wait.
+ if ((available_sequence = cursor.sequence()) < sequence) {
+ // acquire lock
+ boost::unique_lock<boost::recursive_mutex> ulock(mutex_);
+ while ((available_sequence = cursor.sequence()) < sequence) {
+ barrier.CheckAlert();
+ consumer_notify_condition_.wait(ulock);
+ }
+ } // unlock happens here, on ulock destruction.
+
+ if (0 != dependents.size()) {
+ while ((available_sequence = GetMinimumSequence(dependents)) < \
+ sequence) {
+ barrier.CheckAlert();
+ }
+ }
+
+ return available_sequence;
+ }
+
+ virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+ const Sequence& cursor,
+ const SequenceBarrierInterface& barrier,
+ const int64_t& sequence,
+ const int64_t& timeout_micros) {
+ int64_t available_sequence = 0;
+ // We have to wait
+ if ((available_sequence = cursor.sequence()) < sequence) {
+ boost::unique_lock<boost::recursive_mutex> ulock(mutex_);
+ while ((available_sequence = cursor.sequence()) < sequence) {
+ barrier.CheckAlert();
+ if (boost::cv_status::timeout == consumer_notify_condition_.wait_for(ulock,
+ boost::chrono::microseconds(timeout_micros)))
+ break;
+
+ }
+ } // unlock happens here, on ulock destruction
+
+ if (0 != dependents.size()) {
+ while ((available_sequence = GetMinimumSequence(dependents)) \
+ < sequence) {
+ barrier.CheckAlert();
+ }
+ }
+
+ return available_sequence;
+ }
+
+ virtual void SignalAllWhenBlocking() {
+ boost::unique_lock<boost::recursive_mutex> ulock(mutex_);
+ consumer_notify_condition_.notify_all();
+ }
+
+ private:
+ boost::recursive_mutex mutex_;
+ boost::condition_variable_any consumer_notify_condition_;
+
+};
+
+// Sleeping strategy
+class SleepingStrategy : public WaitStrategyInterface {
+ public:
+ SleepingStrategy() {}
+
+ virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+ const Sequence& cursor,
+ const SequenceBarrierInterface& barrier,
+ const int64_t& sequence) {
+ int64_t available_sequence = 0;
+ int counter = kRetries;
+
+ if (0 == dependents.size()) {
+ while ((available_sequence = cursor.sequence()) < sequence) {
+ counter = ApplyWaitMethod(barrier, counter);
+ }
+ } else {
+ while ((available_sequence = GetMinimumSequence(dependents)) < \
+ sequence) {
+ counter = ApplyWaitMethod(barrier, counter);
+ }
+ }
+
+ return available_sequence;
+ }
+
+ virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+ const Sequence& cursor,
+ const SequenceBarrierInterface& barrier,
+ const int64_t& sequence,
+ const int64_t& timeout_micros) {
+ // timing
+ struct timeval start_time, end_time;
+ gettimeofday(&start_time, NULL);
+ int64_t start_micro = start_time.tv_sec*1000000 + start_time.tv_usec;
+
+ int64_t available_sequence = 0;
+ int counter = kRetries;
+
+ if (0 == dependents.size()) {
+ while ((available_sequence = cursor.sequence()) < sequence) {
+ counter = ApplyWaitMethod(barrier, counter);
+ gettimeofday(&end_time, NULL);
+ int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec;
+ if (timeout_micros < (end_micro - start_micro))
+ break;
+ }
+ } else {
+ while ((available_sequence = GetMinimumSequence(dependents)) < \
+ sequence) {
+ counter = ApplyWaitMethod(barrier, counter);
+ gettimeofday(&end_time, NULL);
+ int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec;
+ if (timeout_micros < (end_micro - start_micro))
+ break;
+ }
+ }
+
+ return available_sequence;
+ }
+
+ virtual void SignalAllWhenBlocking() {}
+
+ static const int kRetries = 200;
+
+ private:
+ int ApplyWaitMethod(const SequenceBarrierInterface& barrier, int counter) {
+ barrier.CheckAlert();
+ if (counter > 100) {
+ counter--;
+ } else if (counter > 0) {
+ counter--;
+ boost::this_thread::yield();
+ } else {
+ boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
+ }
+
+ return counter;
+ }
+
+};
+
+// Yielding strategy that uses a sleep(0) for {@link EventProcessor}s waiting
+// on a barrier. This strategy is a good compromise between performance and
+// CPU resource.
+class YieldingStrategy : public WaitStrategyInterface {
+ public:
+ YieldingStrategy() {}
+
+ virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+ const Sequence& cursor,
+ const SequenceBarrierInterface& barrier,
+ const int64_t& sequence) {
+ int64_t available_sequence = 0;
+ int counter = kSpinTries;
+
+ if (0 == dependents.size()) {
+ while ((available_sequence = cursor.sequence()) < sequence) {
+ counter = ApplyWaitMethod(barrier, counter);
+ }
+ } else {
+ while ((available_sequence = GetMinimumSequence(dependents)) < \
+ sequence) {
+ counter = ApplyWaitMethod(barrier, counter);
+ }
+ }
+
+ return available_sequence;
+ }
+
+ virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+ const Sequence& cursor,
+ const SequenceBarrierInterface& barrier,
+ const int64_t& sequence,
+ const int64_t& timeout_micros) {
+ struct timeval start_time, end_time;
+ gettimeofday(&start_time, NULL);
+ int64_t start_micro = start_time.tv_sec*1000000 + start_time.tv_usec;
+
+ int64_t available_sequence = 0;
+ int counter = kSpinTries;
+
+ if (0 == dependents.size()) {
+ while ((available_sequence = cursor.sequence()) < sequence) {
+ counter = ApplyWaitMethod(barrier, counter);
+ gettimeofday(&end_time, NULL);
+ int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec;
+ if (timeout_micros < (end_micro - start_micro))
+ break;
+ }
+ } else {
+ while ((available_sequence = GetMinimumSequence(dependents)) < \
+ sequence) {
+ counter = ApplyWaitMethod(barrier, counter);
+ gettimeofday(&end_time, NULL);
+ int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec;
+ if (timeout_micros < (end_micro - start_micro))
+ break;
+ }
+ }
+
+ return available_sequence;
+ }
+
+ virtual void SignalAllWhenBlocking() {}
+
+ static const int kSpinTries = 100;
+
+ private:
+ int ApplyWaitMethod(const SequenceBarrierInterface& barrier, int counter) {
+ barrier.CheckAlert();
+ if (counter == 0) {
+ boost::this_thread::yield();
+ } else {
+ counter--;
+ }
+
+ return counter;
+ }
+
+};
+
+
+// Busy Spin strategy that uses a busy spin loop for {@link EventProcessor}s
+// waiting on a barrier.
+// This strategy will use CPU resource to avoid syscalls which can introduce
+// latency jitter. It is best used when threads can be bound to specific
+// CPU cores.
+class BusySpinStrategy : public WaitStrategyInterface {
+ public:
+ BusySpinStrategy() {}
+
+ virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+ const Sequence& cursor,
+ const SequenceBarrierInterface& barrier,
+ const int64_t& sequence) {
+ int64_t available_sequence = 0;
+ if (0 == dependents.size()) {
+ while ((available_sequence = cursor.sequence()) < sequence) {
+ barrier.CheckAlert();
+ }
+ } else {
+ while ((available_sequence = GetMinimumSequence(dependents)) < \
+ sequence) {
+ barrier.CheckAlert();
+ }
+ }
+
+ return available_sequence;
+ }
+
+ virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+ const Sequence& cursor,
+ const SequenceBarrierInterface& barrier,
+ const int64_t& sequence,
+ const int64_t& timeout_micros) {
+ struct timeval start_time, end_time;
+ gettimeofday(&start_time, NULL);
+ int64_t start_micro = start_time.tv_sec*1000000 + start_time.tv_usec;
+ int64_t available_sequence = 0;
+
+ if (0 == dependents.size()) {
+ while ((available_sequence = cursor.sequence()) < sequence) {
+ barrier.CheckAlert();
+ gettimeofday(&end_time, NULL);
+ int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec;
+ if (timeout_micros < (end_micro - start_micro))
+ break;
+ }
+ } else {
+ while ((available_sequence = GetMinimumSequence(dependents)) < \
+ sequence) {
+ barrier.CheckAlert();
+ gettimeofday(&end_time, NULL);
+ int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec;
+ if (timeout_micros < (end_micro - start_micro))
+ break;
+ }
+ }
+
+ return available_sequence;
+ }
+
+ virtual void SignalAllWhenBlocking() {}
+
+};
+
+WaitStrategyInterface* CreateWaitStrategy(WaitStrategyOption wait_option) {
+ switch (wait_option) {
+ case kBlockingStrategy:
+ return new BlockingStrategy();
+ case kSleepingStrategy:
+ return new SleepingStrategy();
+ case kYieldingStrategy:
+ return new YieldingStrategy();
+ case kBusySpinStrategy:
+ return new BusySpinStrategy();
+ default:
+ return NULL;
+ }
+}
+
+
+}; // namespace rocketmq
+
+#endif // DISRUPTOR_WAITSTRATEGY_H_ NOLINT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptorLFQ.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptorLFQ.h b/rocketmq-cpp/src/thread/disruptorLFQ.h
new file mode 100644
index 0000000..a05b0cd
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptorLFQ.h
@@ -0,0 +1,113 @@
+#ifndef _DISRUPTORLFQ_
+#define _DISRUPTORLFQ_
+
+#include <disruptor/event_processor.h>
+#include <disruptor/event_publisher.h>
+#include <disruptor/exception_handler.h>
+#include <disruptor/interface.h>
+
+#include <boost/asio/io_service.hpp>
+#include <boost/bind.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/thread.hpp>
+
+namespace rocketmq {
+class Task;
+class taskEventFactory : public EventFactoryInterface<Task> {
+ public:
+ virtual Task* NewInstance(const int& size) const;
+};
+
+class taskBatchHandler : public EventHandlerInterface<Task> {
+ public:
+ taskBatchHandler(int pullMsgThreadPoolNum);
+ virtual ~taskBatchHandler() {}
+
+ virtual void OnEvent(const int64_t& sequence, const bool& end_of_batch,
+ Task* event);
+ virtual void OnStart() {}
+ virtual void OnShutdown() {}
+ void runTaskEvent(Task event, int64_t sequence);
+ void stopIOService();
+
+ private:
+ boost::asio::io_service m_ioService;
+ boost::thread_group m_threadpool;
+ boost::asio::io_service::work m_ioServiceWork;
+};
+
+class taskEventTranslator : public EventTranslatorInterface<Task> {
+ public:
+ taskEventTranslator(Task* event);
+ virtual ~taskEventTranslator() {}
+ virtual Task* TranslateTo(const int64_t& sequence, Task* event);
+
+ private:
+ Task* m_taskEvent;
+};
+
+class taskExceptionHandler : public ExceptionHandlerInterface<Task> {
+ public:
+ virtual void Handle(const std::exception& exception, const int64_t& sequence,
+ Task* event) {}
+};
+
+class disruptorLFQ {
+ public:
+ disruptorLFQ(int threadCount) {
+ m_task_factory.reset(new taskEventFactory());
+ m_ring_buffer.reset(new RingBuffer<Task>(
+ m_task_factory.get(),
+ 1024, // default size is 1024, must be n power of 2
+ kSingleThreadedStrategy,
+ // metaq::kBusySpinStrategy);//load normal, high cpu occupy, and
+ // smallest consume latency
+ // metaq::kYieldingStrategy); //load normal, high cpu occupy, and
+ // smaller consume latency
+ // metaq::kSleepingStrategy);//load normal, lowest cpu occupy, but
+ // largest consume latency
+ kBlockingStrategy)); // load normal, lowest CPU occupy, but
+ // largest consume latency
+
+ m_sequence_to_track.reset(new std::vector<Sequence*>(0));
+ m_sequenceBarrier.reset(
+ m_ring_buffer->NewBarrier(*(m_sequence_to_track.get())));
+
+ m_task_handler.reset(new taskBatchHandler(threadCount));
+ m_task_exception_handler.reset(new taskExceptionHandler());
+ m_processor.reset(new BatchEventProcessor<Task>(
+ m_ring_buffer.get(),
+ (SequenceBarrierInterface*)m_sequenceBarrier.get(),
+ m_task_handler.get(), m_task_exception_handler.get()));
+
+ /*
+ Publisher will try to publish BUFFER_SIZE + 1 events.
+ The last event should wait for at least one consume before publishing, thus
+ preventing an overwrite.
+ After the single consume, the publisher should resume and publish the last
+ event.
+ */
+ m_gating_sequences.push_back(m_processor.get()->GetSequence());
+ m_ring_buffer->set_gating_sequences(
+ m_gating_sequences); // prevent overlap, publishEvent will be blocked
+ // on ring_buffer_->Next();
+
+ m_publisher.reset(new EventPublisher<Task>(m_ring_buffer.get()));
+ }
+ virtual ~disruptorLFQ() {}
+
+ public:
+ boost::scoped_ptr<taskEventFactory> m_task_factory;
+ boost::scoped_ptr<taskBatchHandler> m_task_handler;
+ boost::scoped_ptr<taskExceptionHandler> m_task_exception_handler;
+ boost::scoped_ptr<std::vector<Sequence*>> m_sequence_to_track;
+ boost::scoped_ptr<RingBuffer<Task>> m_ring_buffer;
+ boost::scoped_ptr<ProcessingSequenceBarrier> m_sequenceBarrier;
+ boost::scoped_ptr<BatchEventProcessor<Task>> m_processor;
+ boost::scoped_ptr<EventPublisher<Task>> m_publisher;
+ std::vector<Sequence*> m_gating_sequences;
+};
+}
+//<!***************************************************************************
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/task_queue.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/task_queue.cpp b/rocketmq-cpp/src/thread/task_queue.cpp
new file mode 100755
index 0000000..510425c
--- /dev/null
+++ b/rocketmq-cpp/src/thread/task_queue.cpp
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "task_queue.h"
+#include <sys/prctl.h>
+#include "UtilAll.h"
+#include "disruptorLFQ.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+Task* taskEventFactory::NewInstance(const int& size) const {
+ return new Task[size];
+}
+
+taskBatchHandler::taskBatchHandler(int pullMsgThreadPoolNum)
+ : m_ioServiceWork(m_ioService) {
+ string taskName = UtilAll::getProcessName();
+ prctl(PR_SET_NAME, "PullMsgTP", 0, 0, 0);
+ for (int i = 0; i != pullMsgThreadPoolNum; ++i) {
+ m_threadpool.create_thread(
+ boost::bind(&boost::asio::io_service::run, &m_ioService));
+ }
+ prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
+}
+
+void taskBatchHandler::OnEvent(const int64_t& sequence,
+ const bool& end_of_batch, Task* event) {
+ //cp Task event out, avoid publish event override current Task event
+ Task currentTask(*event);
+ m_ioService.post(boost::bind(&taskBatchHandler::runTaskEvent, this,
+ currentTask, sequence));
+}
+
+void taskBatchHandler::runTaskEvent(Task event, int64_t sequence) {
+ // LOG_INFO("processor event sequence:%lld", sequence);
+ event.run();
+}
+
+void taskBatchHandler::stopIOService() {
+ m_ioService.stop();
+ m_threadpool.join_all();
+}
+
+taskEventTranslator::taskEventTranslator(Task* event) : m_taskEvent(event) {}
+
+Task* taskEventTranslator::TranslateTo(const int64_t& sequence, Task* event) {
+ // LOG_INFO("publish sequence:%lld, event:%x", sequence, event);
+ *event = *m_taskEvent;
+ return event;
+};
+
+//******************************************************************************************8
+TaskQueue::TaskQueue(int threadCount) {
+ m_flag.store(true, boost::memory_order_release);
+ m_disruptorLFQ = new disruptorLFQ(threadCount);
+}
+
+TaskQueue::~TaskQueue() {
+ delete m_disruptorLFQ;
+ m_disruptorLFQ = NULL;
+}
+
+void TaskQueue::close() {
+ m_flag.store(false, boost::memory_order_release);
+ m_disruptorLFQ->m_task_handler->stopIOService();
+ m_disruptorLFQ->m_processor->Halt();
+}
+
+bool TaskQueue::bTaskQueueStatusOK() {
+ return m_flag.load(boost::memory_order_acquire) == true;
+}
+
+void TaskQueue::produce(const Task& task) {
+ boost::mutex::scoped_lock lock(m_publishLock);
+ taskEventTranslator pTranslator(const_cast<Task*>(&task));
+ m_disruptorLFQ->m_publisher->PublishEvent(&pTranslator);
+}
+
+int TaskQueue::run() {
+ while (true) {
+ m_disruptorLFQ->m_processor->Run();
+ if (m_flag.load(boost::memory_order_acquire) == false) {
+ break;
+ }
+ }
+ return 0;
+}
+
+//<!***************************************************************************
+} //<! end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/task_queue.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/task_queue.h b/rocketmq-cpp/src/thread/task_queue.h
new file mode 100755
index 0000000..e60607c
--- /dev/null
+++ b/rocketmq-cpp/src/thread/task_queue.h
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *//********************************************************************
+author: qiwei.wqw@alibaba-inc.com
+*********************************************************************/
+#ifndef _TASK_QUEUE_I_
+#define _TASK_QUEUE_I_
+
+#include <boost/atomic.hpp>
+#include <boost/thread/mutex.hpp>
+#include <list>
+#include <vector>
+using namespace std;
+
+namespace rocketmq {
+
+//<!***************************************************************************
+typedef void (*taskfunc)(void*);
+
+//<! 数据加操作的集合
+class ITask_impl {
+ public:
+ virtual ~ITask_impl() {}
+ virtual void run() = 0;
+ virtual ITask_impl* fork() = 0;
+};
+
+//<!***************************************************************************
+class Task_impl : public ITask_impl {
+ public:
+ Task_impl(taskfunc func, void* arg_) : m_func(func), m_arg(arg_) {}
+ virtual ~Task_impl() {
+ m_func = 0;
+ m_arg = 0;
+ }
+ virtual void run() {
+ if (m_func != 0) m_func(m_arg);
+ }
+ virtual ITask_impl* fork() { return new Task_impl(m_func, m_arg); }
+
+ protected:
+ taskfunc m_func;
+ void* m_arg;
+};
+
+//<!***************************************************************************
+//<! 构造ITask_impl的子类对象时,为其赋予不同的数据和操作即可。
+//<! 这里使用了组合的方式实现了接口和实现的分离;
+//<!***************************************************************************
+struct Task {
+ static void dumy(void*) {}
+
+ Task(taskfunc f_, void* d_) { m_pTaskImpl = new Task_impl(f_, d_); }
+ Task(ITask_impl* task_imp_) : m_pTaskImpl(task_imp_) {}
+ Task(const Task& src_) : m_pTaskImpl(src_.m_pTaskImpl->fork()) {}
+ Task() { m_pTaskImpl = new Task_impl(&Task::dumy, 0); }
+ virtual ~Task() { delete m_pTaskImpl; }
+ Task& operator=(const Task& src_) {
+ delete m_pTaskImpl;
+ m_pTaskImpl = src_.m_pTaskImpl->fork();
+ return *this;
+ }
+ void run() {
+ if (m_pTaskImpl) m_pTaskImpl->run();
+ }
+
+ private:
+ ITask_impl* m_pTaskImpl;
+};
+
+//<!***************************************************************************
+class ITaskQueue {
+ public:
+ typedef list<Task> TaskList;
+
+ public:
+ virtual ~ITaskQueue() {}
+ virtual void close() = 0;
+ virtual void produce(const Task& task) = 0;
+ // virtual void multi_produce(const TaskList& tasks) = 0;
+ // virtual int consume(Task& task) = 0;
+ // virtual int consume_all(TaskList& tasks) = 0;
+ virtual int run() = 0;
+ // virtual int batch_run() = 0;
+ virtual bool bTaskQueueStatusOK() = 0;
+};
+
+//<!***************************************************************************
+//<! 由于不同的操作和数据可能需要构造不同ITask_impl子类,
+//<!
+//我们需要提供一些泛型函数,能够将用户的所有操作和数据都能轻易的转换成Task对象。
+//<! TaskBinder 提供一系列的gen函数,能够转换用户的普通函数和数据为Task对象;
+//<!***************************************************************************
+struct TaskBinder {
+ static Task gen(void (*func)(void*), void* p_) { return Task(func, p_); }
+
+ template <typename RET>
+ static Task gen(RET (*func)(void)) {
+ struct lambda {
+ static void taskfunc(void* p_) { (*(RET(*)(void))p_)(); };
+ };
+ return Task(lambda::taskfunc, (void*)func);
+ }
+
+ template <typename FUNCT, typename ARG1>
+ static Task gen(FUNCT func, ARG1 arg1) {
+ struct lambda : public ITask_impl {
+ FUNCT dest_func;
+ ARG1 arg1;
+ lambda(FUNCT func, const ARG1& arg1) : dest_func(func), arg1(arg1) {}
+ virtual void run() { (*dest_func)(arg1); }
+ virtual ITask_impl* fork() { return new lambda(dest_func, arg1); }
+ };
+ return Task(new lambda(func, arg1));
+ }
+
+ template <typename FUNCT, typename ARG1, typename ARG2>
+ static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2) {
+ struct lambda : public ITask_impl {
+ FUNCT dest_func;
+ ARG1 arg1;
+ ARG2 arg2;
+ lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2)
+ : dest_func(func), arg1(arg1), arg2(arg2) {}
+ virtual void run() { (*dest_func)(arg1, arg2); }
+ virtual ITask_impl* fork() { return new lambda(dest_func, arg1, arg2); }
+ };
+ return Task(new lambda(func, arg1, arg2));
+ }
+
+ template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3>
+ static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3) {
+ struct lambda : public ITask_impl {
+ FUNCT dest_func;
+ ARG1 arg1;
+ ARG2 arg2;
+ ARG3 arg3;
+ lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3)
+ : dest_func(func), arg1(arg1), arg2(arg2), arg3(arg3) {}
+ virtual void run() { (*dest_func)(arg1, arg2, arg3); }
+ virtual ITask_impl* fork() {
+ return new lambda(dest_func, arg1, arg2, arg3);
+ }
+ };
+ return Task(new lambda(func, arg1, arg2, arg3));
+ }
+
+ template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3,
+ typename ARG4>
+ static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4) {
+ struct lambda : public ITask_impl {
+ FUNCT dest_func;
+ ARG1 arg1;
+ ARG2 arg2;
+ ARG3 arg3;
+ ARG4 arg4;
+ lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+ const ARG4& arg4)
+ : dest_func(func), arg1(arg1), arg2(arg2), arg3(arg3), arg4(arg4) {}
+ virtual void run() { (*dest_func)(arg1, arg2, arg3, arg4); }
+ virtual ITask_impl* fork() {
+ return new lambda(dest_func, arg1, arg2, arg3, arg4);
+ }
+ };
+ return Task(new lambda(func, arg1, arg2, arg3, arg4));
+ }
+
+ template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3,
+ typename ARG4, typename ARG5>
+ static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4,
+ ARG5 arg5) {
+ struct lambda : public ITask_impl {
+ FUNCT dest_func;
+ ARG1 arg1;
+ ARG2 arg2;
+ ARG3 arg3;
+ ARG4 arg4;
+ ARG5 arg5;
+ lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+ const ARG4& arg4, const ARG5& arg5)
+ : dest_func(func),
+ arg1(arg1),
+ arg2(arg2),
+ arg3(arg3),
+ arg4(arg4),
+ arg5(arg5) {}
+ virtual void run() { (*dest_func)(arg1, arg2, arg3, arg4, arg5); }
+ virtual ITask_impl* fork() {
+ return new lambda(dest_func, arg1, arg2, arg3, arg4, arg5);
+ }
+ };
+ return Task(new lambda(func, arg1, arg2, arg3, arg4, arg5));
+ }
+
+ template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3,
+ typename ARG4, typename ARG5, typename ARG6>
+ static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4,
+ ARG5 arg5, ARG6 arg6) {
+ struct lambda : public ITask_impl {
+ FUNCT dest_func;
+ ARG1 arg1;
+ ARG2 arg2;
+ ARG3 arg3;
+ ARG4 arg4;
+ ARG5 arg5;
+ ARG6 arg6;
+ lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+ const ARG4& arg4, const ARG5& arg5, const ARG6& arg6)
+ : dest_func(func),
+ arg1(arg1),
+ arg2(arg2),
+ arg3(arg3),
+ arg4(arg4),
+ arg5(arg5),
+ arg6(arg6) {}
+ virtual void run() { (*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6); }
+ virtual ITask_impl* fork() {
+ return new lambda(dest_func, arg1, arg2, arg3, arg4, arg5, arg6);
+ }
+ };
+ return Task(new lambda(func, arg1, arg2, arg3, arg4, arg5, arg6));
+ }
+
+ template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3,
+ typename ARG4, typename ARG5, typename ARG6, typename ARG7>
+ static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4,
+ ARG5 arg5, ARG6 arg6, ARG7 arg7) {
+ struct lambda : public ITask_impl {
+ FUNCT dest_func;
+ ARG1 arg1;
+ ARG2 arg2;
+ ARG3 arg3;
+ ARG4 arg4;
+ ARG5 arg5;
+ ARG6 arg6;
+ ARG7 arg7;
+ lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+ const ARG4& arg4, const ARG5& arg5, const ARG6& arg6,
+ const ARG7& arg7)
+ : dest_func(func),
+ arg1(arg1),
+ arg2(arg2),
+ arg3(arg3),
+ arg4(arg4),
+ arg5(arg5),
+ arg6(arg6),
+ arg7(arg7) {}
+ virtual void run() {
+ (*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7);
+ }
+ virtual ITask_impl* fork() {
+ return new lambda(dest_func, arg1, arg2, arg3, arg4, arg5, arg6, arg7);
+ }
+ };
+ return Task(new lambda(func, arg1, arg2, arg3, arg4, arg5, arg6, arg7));
+ }
+
+ template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3,
+ typename ARG4, typename ARG5, typename ARG6, typename ARG7,
+ typename ARG8>
+ static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4,
+ ARG5 arg5, ARG6 arg6, ARG7 arg7, ARG8 arg8) {
+ struct lambda : public ITask_impl {
+ FUNCT dest_func;
+ ARG1 arg1;
+ ARG2 arg2;
+ ARG3 arg3;
+ ARG4 arg4;
+ ARG5 arg5;
+ ARG6 arg6;
+ ARG7 arg7;
+ ARG8 arg8;
+ lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+ const ARG4& arg4, const ARG5& arg5, const ARG6& arg6,
+ const ARG7& arg7, const ARG8& arg8)
+ : dest_func(func),
+ arg1(arg1),
+ arg2(arg2),
+ arg3(arg3),
+ arg4(arg4),
+ arg5(arg5),
+ arg6(arg6),
+ arg7(arg7),
+ arg8(arg8) {}
+ virtual void run() {
+ (*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8);
+ }
+ virtual ITask_impl* fork() {
+ return new lambda(dest_func, arg1, arg2, arg3, arg4, arg5, arg6, arg7,
+ arg8);
+ }
+ };
+ return Task(
+ new lambda(func, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8));
+ }
+
+ template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3,
+ typename ARG4, typename ARG5, typename ARG6, typename ARG7,
+ typename ARG8, typename ARG9>
+ static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4,
+ ARG5 arg5, ARG6 arg6, ARG7 arg7, ARG8 arg8, ARG9 arg9) {
+ struct lambda : public ITask_impl {
+ FUNCT dest_func;
+ ARG1 arg1;
+ ARG2 arg2;
+ ARG3 arg3;
+ ARG4 arg4;
+ ARG5 arg5;
+ ARG6 arg6;
+ ARG7 arg7;
+ ARG8 arg8;
+ ARG9 arg9;
+ lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+ const ARG4& arg4, const ARG5& arg5, const ARG6& arg6,
+ const ARG7& arg7, const ARG8& arg8, const ARG9& arg9)
+ : dest_func(func),
+ arg1(arg1),
+ arg2(arg2),
+ arg3(arg3),
+ arg4(arg4),
+ arg5(arg5),
+ arg6(arg6),
+ arg7(arg7),
+ arg8(arg8),
+ arg9(arg9) {}
+ virtual void run() {
+ (*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9);
+ }
+ virtual ITask_impl* fork() {
+ return new lambda(dest_func, arg1, arg2, arg3, arg4, arg5, arg6, arg7,
+ arg8, arg9);
+ }
+ };
+ return Task(
+ new lambda(func, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9));
+ }
+
+ //<!***************************************************************************
+ //<! class fuctions;;
+ //<!***************************************************************************
+ template <typename T, typename RET>
+ static Task gen(RET (T::*func)(void), T* obj) {
+ struct lambda : public ITask_impl {
+ RET (T::*dest_func)(void);
+ T* obj;
+ lambda(RET (T::*func)(void), T* obj) : dest_func(func), obj(obj) {}
+ virtual void run() { (obj->*dest_func)(); }
+ virtual ITask_impl* fork() { return new lambda(dest_func, obj); }
+ };
+ return Task(new lambda(func, obj));
+ }
+
+ template <typename T, typename RET, typename FARG1, typename ARG1>
+ static Task gen(RET (T::*func)(FARG1), T* obj, ARG1 arg1) {
+ struct lambda : public ITask_impl {
+ RET (T::*dest_func)(FARG1);
+ T* obj;
+ ARG1 arg1;
+ lambda(RET (T::*pfunc)(FARG1), T* obj, const ARG1& arg1)
+ : dest_func(pfunc), obj(obj), arg1(arg1) {}
+ virtual void run() { (obj->*dest_func)(arg1); }
+ virtual ITask_impl* fork() { return new lambda(dest_func, obj, arg1); }
+ };
+ return Task(new lambda(func, obj, arg1));
+ }
+
+ template <typename T, typename RET, typename FARG1, typename FARG2,
+ typename ARG1, typename ARG2>
+ static Task gen(RET (T::*func)(FARG1, FARG2), T* obj, ARG1 arg1, ARG2 arg2) {
+ struct lambda : public ITask_impl {
+ RET (T::*dest_func)(FARG1, FARG2);
+ T* obj;
+ ARG1 arg1;
+ ARG2 arg2;
+ lambda(RET (T::*func)(FARG1, FARG2), T* obj, const ARG1& arg1,
+ const ARG2& arg2)
+ : dest_func(func), obj(obj), arg1(arg1), arg2(arg2) {}
+ virtual void run() { (obj->*dest_func)(arg1, arg2); }
+ virtual ITask_impl* fork() {
+ return new lambda(dest_func, obj, arg1, arg2);
+ }
+ };
+ return Task(new lambda(func, obj, arg1, arg2));
+ }
+
+ template <typename T, typename RET, typename FARG1, typename FARG2,
+ typename FARG3, typename ARG1, typename ARG2, typename ARG3>
+ static Task gen(RET (T::*func)(FARG1, FARG2, FARG3), T* obj, ARG1 arg1,
+ ARG2 arg2, ARG3 arg3) {
+ struct lambda : public ITask_impl {
+ RET (T::*dest_func)(FARG1, FARG2, FARG3);
+ T* obj;
+ ARG1 arg1;
+ ARG2 arg2;
+ ARG3 arg3;
+ lambda(RET (T::*func)(FARG1, FARG2, FARG3), T* obj, const ARG1& arg1,
+ const ARG2& arg2, const ARG3& arg3)
+ : dest_func(func), obj(obj), arg1(arg1), arg2(arg2), arg3(arg3) {}
+ virtual void run() { (obj->*dest_func)(arg1, arg2, arg3); }
+ virtual ITask_impl* fork() {
+ return new lambda(dest_func, obj, arg1, arg2, arg3);
+ }
+ };
+ return Task(new lambda(func, obj, arg1, arg2, arg3));
+ }
+
+ template <typename T, typename RET, typename FARG1, typename FARG2,
+ typename FARG3, typename FARG4, typename ARG1, typename ARG2,
+ typename ARG3, typename ARG4>
+ static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4), T* obj, ARG1 arg1,
+ ARG2 arg2, ARG3 arg3, ARG4 arg4) {
+ struct lambda : public ITask_impl {
+ RET (T::*dest_func)(FARG1, FARG2, FARG3, FARG4);
+ T* obj;
+ ARG1 arg1;
+ ARG2 arg2;
+ ARG3 arg3;
+ ARG4 arg4;
+ lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4), T* obj,
+ const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+ const ARG4& arg4)
+ : dest_func(func),
+ obj(obj),
+ arg1(arg1),
+ arg2(arg2),
+ arg3(arg3),
+ arg4(arg4) {}
+ virtual void run() { (obj->*dest_func)(arg1, arg2, arg3, arg4); }
+ virtual ITask_impl* fork() {
+ return new lambda(dest_func, obj, arg1, arg2, arg3, arg4);
+ }
+ };
+ return Task(new lambda(func, obj, arg1, arg2, arg3, arg4));
+ }
+
+ template <typename T, typename RET, typename FARG1, typename FARG2,
+ typename FARG3, typename FARG4, typename FARG5, typename ARG1,
+ typename ARG2, typename ARG3, typename ARG4, typename ARG5>
+ static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5), T* obj,
+ ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, ARG5 arg5) {
+ struct lambda : public ITask_impl {
+ RET (T::*dest_func)(FARG1, FARG2, FARG3, FARG4, FARG5);
+ T* obj;
+ ARG1 arg1;
+ ARG2 arg2;
+ ARG3 arg3;
+ ARG4 arg4;
+ ARG5 arg5;
+ lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5), T* obj,
+ const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+ const ARG4& arg4, const ARG5& arg5)
+ : dest_func(func),
+ obj(obj),
+ arg1(arg1),
+ arg2(arg2),
+ arg3(arg3),
+ arg4(arg4),
+ arg5(arg5) {}
+ virtual void run() { (obj->*dest_func)(arg1, arg2, arg3, arg4, arg5); }
+ virtual ITask_impl* fork() {
+ return new lambda(dest_func, obj, arg1, arg2, arg3, arg4, arg5);
+ }
+ };
+ return Task(new lambda(func, obj, arg1, arg2, arg3, arg4, arg5));
+ }
+
+ template <typename T, typename RET, typename FARG1, typename FARG2,
+ typename FARG3, typename FARG4, typename FARG5, typename FARG6,
+ typename ARG1, typename ARG2, typename ARG3, typename ARG4,
+ typename ARG5, typename ARG6>
+ static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6),
+ T* obj, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, ARG5 arg5,
+ ARG6 arg6) {
+ struct lambda : public ITask_impl {
+ RET (T::*dest_func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6);
+ T* obj;
+ ARG1 arg1;
+ ARG2 arg2;
+ ARG3 arg3;
+ ARG4 arg4;
+ ARG5 arg5;
+ ARG6 arg6;
+ lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6), T* obj,
+ const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+ const ARG4& arg4, const ARG5& arg5, const ARG6& arg6)
+ : dest_func(func),
+ obj(obj),
+ arg1(arg1),
+ arg2(arg2),
+ arg3(arg3),
+ arg4(arg4),
+ arg5(arg5),
+ arg6(arg6) {}
+ virtual void run() {
+ (obj->*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6);
+ }
+ virtual ITask_impl* fork() {
+ return new lambda(dest_func, obj, arg1, arg2, arg3, arg4, arg5, arg6);
+ }
+ };
+ return Task(new lambda(func, obj, arg1, arg2, arg3, arg4, arg5, arg6));
+ }
+
+ template <typename T, typename RET, typename FARG1, typename FARG2,
+ typename FARG3, typename FARG4, typename FARG5, typename FARG6,
+ typename FARG7, typename ARG1, typename ARG2, typename ARG3,
+ typename ARG4, typename ARG5, typename ARG6, typename ARG7>
+ static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6,
+ FARG7),
+ T* obj, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, ARG5 arg5,
+ ARG6 arg6, ARG7 arg7) {
+ struct lambda : public ITask_impl {
+ RET (T::*dest_func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7);
+ T* obj;
+ ARG1 arg1;
+ ARG2 arg2;
+ ARG3 arg3;
+ ARG4 arg4;
+ ARG5 arg5;
+ ARG6 arg6;
+ ARG7 arg7;
+ lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7),
+ T* obj, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+ const ARG4& arg4, const ARG5& arg5, const ARG6& arg6,
+ const ARG7& arg7)
+ : dest_func(func),
+ obj(obj),
+ arg1(arg1),
+ arg2(arg2),
+ arg3(arg3),
+ arg4(arg4),
+ arg5(arg5),
+ arg6(arg6),
+ arg7(arg7) {}
+ virtual void run() {
+ (obj->*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7);
+ }
+ virtual ITask_impl* fork() {
+ return new lambda(dest_func, obj, arg1, arg2, arg3, arg4, arg5, arg6,
+ arg7);
+ }
+ };
+ return Task(
+ new lambda(func, obj, arg1, arg2, arg3, arg4, arg5, arg6, arg7));
+ }
+
+ template <typename T, typename RET, typename FARG1, typename FARG2,
+ typename FARG3, typename FARG4, typename FARG5, typename FARG6,
+ typename FARG7, typename FARG8, typename ARG1, typename ARG2,
+ typename ARG3, typename ARG4, typename ARG5, typename ARG6,
+ typename ARG7, typename ARG8>
+ static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6,
+ FARG7, FARG8),
+ T* obj, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, ARG5 arg5,
+ ARG6 arg6, ARG7 arg7, ARG8 arg8) {
+ struct lambda : public ITask_impl {
+ RET (T::*dest_func)
+ (FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7, FARG8);
+ T* obj;
+ ARG1 arg1;
+ ARG2 arg2;
+ ARG3 arg3;
+ ARG4 arg4;
+ ARG5 arg5;
+ ARG6 arg6;
+ ARG7 arg7;
+ ARG8 arg8;
+ lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7,
+ FARG8),
+ T* obj, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+ const ARG4& arg4, const ARG5& arg5, const ARG6& arg6,
+ const ARG7& arg7, const ARG8& arg8)
+ : dest_func(func),
+ obj(obj),
+ arg1(arg1),
+ arg2(arg2),
+ arg3(arg3),
+ arg4(arg4),
+ arg5(arg5),
+ arg6(arg6),
+ arg7(arg7),
+ arg8(arg8) {}
+ virtual void run() {
+ (obj->*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8);
+ }
+ virtual ITask_impl* fork() {
+ return new lambda(dest_func, obj, arg1, arg2, arg3, arg4, arg5, arg6,
+ arg7, arg8);
+ }
+ };
+ return Task(
+ new lambda(func, obj, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8));
+ }
+
+ template <typename T, typename RET, typename FARG1, typename FARG2,
+ typename FARG3, typename FARG4, typename FARG5, typename FARG6,
+ typename FARG7, typename FARG8, typename FARG9, typename ARG1,
+ typename ARG2, typename ARG3, typename ARG4, typename ARG5,
+ typename ARG6, typename ARG7, typename ARG8, typename ARG9>
+ static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6,
+ FARG7, FARG8, FARG9),
+ T* obj, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, ARG5 arg5,
+ ARG6 arg6, ARG7 arg7, ARG8 arg8, ARG9 arg9) {
+ struct lambda : public ITask_impl {
+ RET (T::*dest_func)
+ (FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7, FARG8, FARG9);
+ T* obj;
+ ARG1 arg1;
+ ARG2 arg2;
+ ARG3 arg3;
+ ARG4 arg4;
+ ARG5 arg5;
+ ARG6 arg6;
+ ARG7 arg7;
+ ARG8 arg8;
+ ARG9 arg9;
+ lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7,
+ FARG8, FARG9),
+ T* obj, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+ const ARG4& arg4, const ARG5& arg5, const ARG6& arg6,
+ const ARG7& arg7, const ARG8& arg8, const ARG9& arg9)
+ : dest_func(func),
+ obj(obj),
+ arg1(arg1),
+ arg2(arg2),
+ arg3(arg3),
+ arg4(arg4),
+ arg5(arg5),
+ arg6(arg6),
+ arg7(arg7),
+ arg8(arg8),
+ arg9(arg9) {}
+ virtual void run() {
+ (obj->*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9);
+ }
+ virtual ITask_impl* fork() {
+ return new lambda(dest_func, obj, arg1, arg2, arg3, arg4, arg5, arg6,
+ arg7, arg8, arg9);
+ }
+ };
+ return Task(new lambda(func, obj, arg1, arg2, arg3, arg4, arg5, arg6, arg7,
+ arg8, arg9));
+ }
+};
+
+//<!***************************************************************************
+class disruptorLFQ;
+class TaskQueue : public ITaskQueue {
+ public:
+ TaskQueue(int threadCount);
+ virtual ~TaskQueue();
+ virtual void close();
+ virtual void produce(const Task& task);
+ virtual int run();
+ virtual bool bTaskQueueStatusOK();
+
+ private:
+ boost::atomic<bool> m_flag;
+ disruptorLFQ* m_disruptorLFQ;
+ boost::mutex m_publishLock;
+};
+
+//<!***************************************************************************
+} //<! end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/ClientRemotingProcessor.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/ClientRemotingProcessor.cpp b/rocketmq-cpp/src/transport/ClientRemotingProcessor.cpp
new file mode 100644
index 0000000..a1462e6
--- /dev/null
+++ b/rocketmq-cpp/src/transport/ClientRemotingProcessor.cpp
@@ -0,0 +1,146 @@
+#include "ClientRemotingProcessor.h"
+#include "ClientRPCHook.h"
+#include "ConsumerRunningInfo.h"
+#include "MQClientFactory.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+ClientRemotingProcessor::ClientRemotingProcessor(
+ MQClientFactory* mqClientFactory)
+ : m_mqClientFactory(mqClientFactory) {}
+
+ClientRemotingProcessor::~ClientRemotingProcessor() {}
+
+RemotingCommand* ClientRemotingProcessor::processRequest(
+ const string& addr, RemotingCommand* request) {
+ LOG_DEBUG("request Command received:processRequest");
+ switch (request->getCode()) {
+ case CHECK_TRANSACTION_STATE:
+ // return checkTransactionState( request);
+ break;
+ case NOTIFY_CONSUMER_IDS_CHANGED:
+ return notifyConsumerIdsChanged(request);
+ break;
+ case RESET_CONSUMER_CLIENT_OFFSET: // oneWayRPC
+ return resetOffset(request);
+ case GET_CONSUMER_STATUS_FROM_CLIENT:
+ // return getConsumeStatus( request);
+ break;
+ case GET_CONSUMER_RUNNING_INFO:
+ return getConsumerRunningInfo(addr, request);
+ break;
+ case CONSUME_MESSAGE_DIRECTLY:
+ // return consumeMessageDirectly( request);
+ break;
+ default:
+ break;
+ }
+ return NULL;
+}
+
+RemotingCommand* ClientRemotingProcessor::resetOffset(
+ RemotingCommand* request) {
+ request->SetExtHeader(request->getCode());
+ const MemoryBlock* pbody = request->GetBody();
+ if (pbody->getSize()) {
+ ResetOffsetBody* offsetBody = ResetOffsetBody::Decode(pbody);
+ ResetOffsetRequestHeader* offsetHeader =
+ (ResetOffsetRequestHeader*)request->getCommandHeader();
+ if (offsetBody) {
+ m_mqClientFactory->resetOffset(offsetHeader->getGroup(),
+ offsetHeader->getTopic(),
+ offsetBody->getOffsetTable());
+ } else {
+ LOG_ERROR(
+ "resetOffset failed as received data could not be unserialized");
+ }
+ }
+ return NULL; // as resetOffset is oneWayRPC, do not need return any response
+}
+
+std::map<MQMessageQueue, int64> ResetOffsetBody::getOffsetTable() {
+ return m_offsetTable;
+}
+
+void ResetOffsetBody::setOffsetTable(MQMessageQueue mq, int64 offset) {
+ m_offsetTable[mq] = offset;
+}
+
+ResetOffsetBody* ResetOffsetBody::Decode(const MemoryBlock* mem) {
+ const char* const pData = static_cast<const char*>(mem->getData());
+ Json::Reader reader;
+ Json::Value root;
+ const char* begin = pData;
+ const char* end = pData + mem->getSize();
+
+ if (!reader.parse(begin, end, root, true)) {
+ LOG_ERROR("ResetOffsetBody::Decode fail");
+ return NULL;
+ }
+
+ ResetOffsetBody* rfb = new ResetOffsetBody();
+ Json::Value qds = root["offsetTable"];
+ for (unsigned int i = 0; i < qds.size(); i++) {
+ MQMessageQueue mq;
+ Json::Value qd = qds[i];
+ mq.setBrokerName(qd["brokerName"].asString());
+ mq.setQueueId(qd["queueId"].asInt());
+ mq.setTopic(qd["topic"].asString());
+ int64 offset = qd["offset"].asInt();
+ LOG_INFO("ResetOffsetBody brokerName:%s, queueID:%d, topic:%s, offset:%lld",
+ mq.getBrokerName().c_str(), mq.getQueueId(), mq.getTopic().c_str(),
+ offset);
+ rfb->setOffsetTable(mq, offset);
+ }
+ return rfb;
+}
+
+RemotingCommand* ClientRemotingProcessor::getConsumerRunningInfo(
+ const string& addr, RemotingCommand* request) {
+ request->SetExtHeader(request->getCode());
+ GetConsumerRunningInfoRequestHeader* requestHeader =
+ (GetConsumerRunningInfoRequestHeader*)request->getCommandHeader();
+ LOG_INFO("getConsumerRunningInfo:%s",
+ requestHeader->getConsumerGroup().c_str());
+
+ RemotingCommand* pResponse = new RemotingCommand(
+ request->getCode(), "CPP", request->getVersion(), request->getOpaque(),
+ request->getFlag(), request->getRemark(), NULL);
+
+ unique_ptr<ConsumerRunningInfo> runningInfo(
+ m_mqClientFactory->consumerRunningInfo(
+ requestHeader->getConsumerGroup()));
+ if (runningInfo) {
+ if (requestHeader->isJstackEnable()) {
+ /*string jstack = UtilAll::jstack();
+ consumerRunningInfo->setJstack(jstack);*/
+ }
+ pResponse->setCode(SUCCESS_VALUE);
+ string body = runningInfo->encode();
+ pResponse->SetBody(body.c_str(), body.length());
+ pResponse->setMsgBody(body);
+ } else {
+ pResponse->setCode(SYSTEM_ERROR);
+ pResponse->setRemark("The Consumer Group not exist in this consumer");
+ }
+
+ SessionCredentials sessionCredentials;
+ m_mqClientFactory->getSessionCredentialFromConsumer(
+ requestHeader->getConsumerGroup(), sessionCredentials);
+ ClientRPCHook rpcHook(sessionCredentials);
+ rpcHook.doBeforeRequest(addr, *pResponse);
+ pResponse->Encode();
+ return pResponse;
+}
+
+RemotingCommand* ClientRemotingProcessor::notifyConsumerIdsChanged(
+ RemotingCommand* request) {
+ request->SetExtHeader(request->getCode());
+ NotifyConsumerIdsChangedRequestHeader* requestHeader =
+ (NotifyConsumerIdsChangedRequestHeader*)request->getCommandHeader();
+ LOG_INFO("notifyConsumerIdsChanged:%s", requestHeader->getGroup().c_str());
+ m_mqClientFactory->doRebalanceByConsumerGroup(requestHeader->getGroup());
+ return NULL;
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/ClientRemotingProcessor.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/ClientRemotingProcessor.h b/rocketmq-cpp/src/transport/ClientRemotingProcessor.h
new file mode 100644
index 0000000..1bb757e
--- /dev/null
+++ b/rocketmq-cpp/src/transport/ClientRemotingProcessor.h
@@ -0,0 +1,39 @@
+#ifndef __CLIENTREMOTINGPROCESSOR_H__
+#define __CLIENTREMOTINGPROCESSOR_H__
+
+#include "MQMessageQueue.h"
+#include "MQProtos.h"
+#include "RemotingCommand.h"
+
+namespace rocketmq {
+
+class MQClientFactory;
+class ClientRemotingProcessor {
+ public:
+ ClientRemotingProcessor(MQClientFactory* mqClientFactory);
+ virtual ~ClientRemotingProcessor();
+
+ RemotingCommand* processRequest(const string& addr, RemotingCommand* request);
+ RemotingCommand* resetOffset(RemotingCommand* request);
+ RemotingCommand* getConsumerRunningInfo(const string& addr,
+ RemotingCommand* request);
+ RemotingCommand* notifyConsumerIdsChanged(RemotingCommand* request);
+
+ private:
+ MQClientFactory* m_mqClientFactory;
+};
+
+class ResetOffsetBody {
+ public:
+ ResetOffsetBody() {}
+ virtual ~ResetOffsetBody() { m_offsetTable.clear(); }
+ void setOffsetTable(MQMessageQueue mq, int64 offset);
+ std::map<MQMessageQueue, int64> getOffsetTable();
+ static ResetOffsetBody* Decode(const MemoryBlock* mem);
+
+ private:
+ std::map<MQMessageQueue, int64> m_offsetTable;
+};
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/ResponseFuture.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/ResponseFuture.cpp b/rocketmq-cpp/src/transport/ResponseFuture.cpp
new file mode 100755
index 0000000..05cef84
--- /dev/null
+++ b/rocketmq-cpp/src/transport/ResponseFuture.cpp
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ResponseFuture.h"
+#include "Logging.h"
+#include "TcpRemotingClient.h"
+
+namespace rocketmq {
+//<!************************************************************************
+ResponseFuture::ResponseFuture(int requestCode, int opaque,
+ TcpRemotingClient* powner, int64 timeout,
+ bool bAsync /* = false */,
+ AsyncCallbackWrap* pcall /* = NULL */) {
+ m_bAsync.store(bAsync);
+ m_requestCode = requestCode;
+ m_opaque = opaque;
+ m_timeout = timeout;
+ m_pCallbackWrap = pcall;
+ m_pResponseCommand = NULL;
+ m_sendRequestOK = false;
+ m_beginTimestamp = UtilAll::currentTimeMillis();
+ m_asyncCallbackStatus = asyncCallBackStatus_init;
+ if (getASyncFlag()) {
+ m_asyncResponse.store(false);
+ m_syncResponse.store(true);
+ } else {
+ m_asyncResponse.store(true);
+ m_syncResponse.store(false);
+ }
+}
+
+ResponseFuture::~ResponseFuture() {
+ deleteAndZero(m_pCallbackWrap);
+ /*
+ do not set m_pResponseCommand to NULL when destruct, as m_pResponseCommand
+ is used by MQClientAPIImpl concurrently, and will be released by producer or
+ consumer;
+ m_pResponseCommand = NULL;
+ */
+}
+
+void ResponseFuture::releaseThreadCondition() { m_defaultEvent.notify_all(); }
+
+RemotingCommand* ResponseFuture::waitResponse(int timeoutMillis) {
+ boost::unique_lock<boost::mutex> lk(m_defaultEventLock);
+ if (!m_defaultEvent.timed_wait(
+ lk, boost::posix_time::milliseconds(timeoutMillis))) {
+ LOG_WARN("waitResponse of code:%d with opaque:%d timeout", m_requestCode,
+ m_opaque);
+ m_syncResponse.store(true);
+ }
+ return m_pResponseCommand;
+}
+
+void ResponseFuture::setResponse(RemotingCommand* pResponseCommand) {
+ // LOG_DEBUG("setResponse of opaque:%d",m_opaque);
+ m_pResponseCommand = pResponseCommand;
+
+ if (!getASyncFlag()) {
+ if (m_syncResponse.load() == false) {
+ m_defaultEvent.notify_all();
+ m_syncResponse.store(true);
+ }
+ }
+}
+
+const bool ResponseFuture::getSyncResponseFlag() {
+ if (m_syncResponse.load() == true) {
+ return true;
+ }
+ return false;
+}
+
+const bool ResponseFuture::getAsyncResponseFlag() {
+ if (m_asyncResponse.load() == true) {
+ // LOG_DEBUG("ASYNC flag is TRUE,opaque is:%d",getOpaque() );
+ return true;
+ }
+
+ return false;
+}
+
+void ResponseFuture::setAsyncResponseFlag() { m_asyncResponse.store(true); }
+
+const bool ResponseFuture::getASyncFlag() {
+ if (m_bAsync.load() == true) {
+ // LOG_DEBUG("ASYNC flag is TRUE,opaque is:%d",getOpaque() );
+ return true;
+ }
+ return false;
+}
+
+bool ResponseFuture::isSendRequestOK() { return m_sendRequestOK; }
+
+void ResponseFuture::setSendRequestOK(bool sendRequestOK) {
+ m_sendRequestOK = sendRequestOK;
+}
+
+int ResponseFuture::getOpaque() const { return m_opaque; }
+
+int ResponseFuture::getRequestCode() const { return m_requestCode; }
+
+void ResponseFuture::setAsyncCallBackStatus(
+ asyncCallBackStatus asyncCallbackStatus) {
+ boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
+ if (m_asyncCallbackStatus == asyncCallBackStatus_init) {
+ m_asyncCallbackStatus = asyncCallbackStatus;
+ }
+}
+
+void ResponseFuture::executeInvokeCallback() {
+ if (m_pCallbackWrap == NULL) {
+ deleteAndZero(m_pResponseCommand);
+ return;
+ } else {
+ if (m_asyncCallbackStatus == asyncCallBackStatus_response) {
+ m_pCallbackWrap->operationComplete(this, true);
+ } else {
+ if (m_pResponseCommand)
+ deleteAndZero(m_pResponseCommand); // the responseCommand from
+ // RemotingCommand::Decode(mem) will
+ // only deleted by operationComplete
+ // automatically
+ LOG_WARN(
+ "timeout and response incoming concurrently of opaque:%d, and "
+ "executeInvokeCallbackException was called earlier",
+ m_opaque);
+ }
+ }
+}
+
+void ResponseFuture::executeInvokeCallbackException() {
+ if (m_pCallbackWrap == NULL) {
+ LOG_ERROR("m_pCallbackWrap is NULL, critical error");
+ return;
+ } else {
+ if (m_asyncCallbackStatus == asyncCallBackStatus_timeout) {
+ m_pCallbackWrap->onException();
+ } else {
+ LOG_WARN(
+ "timeout and response incoming concurrently of opaque:%d, and "
+ "executeInvokeCallback was called earlier",
+ m_opaque);
+ }
+ }
+}
+
+bool ResponseFuture::isTimeOut() const {
+ int64 diff = UtilAll::currentTimeMillis() - m_beginTimestamp;
+ //<!only async;
+ return m_bAsync.load() == 1 && diff > m_timeout;
+}
+
+RemotingCommand* ResponseFuture::getCommand() const {
+ return m_pResponseCommand;
+}
+
+AsyncCallbackWrap* ResponseFuture::getAsyncCallbackWrap() {
+ return m_pCallbackWrap;
+}
+
+//<!************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/ResponseFuture.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/ResponseFuture.h b/rocketmq-cpp/src/transport/ResponseFuture.h
new file mode 100755
index 0000000..92fa772
--- /dev/null
+++ b/rocketmq-cpp/src/transport/ResponseFuture.h
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RESPONSEFUTURE_H__
+#define __RESPONSEFUTURE_H__
+#include <boost/atomic.hpp>
+#include <boost/thread/condition_variable.hpp>
+#include "AsyncCallbackWrap.h"
+#include "RemotingCommand.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+typedef enum asyncCallBackStatus {
+ asyncCallBackStatus_init = 0,
+ asyncCallBackStatus_response = 1,
+ asyncCallBackStatus_timeout = 2
+} asyncCallBackStatus;
+
+class TcpRemotingClient;
+//<!***************************************************************************
+class ResponseFuture {
+ public:
+ ResponseFuture(int requestCode, int opaque, TcpRemotingClient* powner,
+ int64 timeoutMilliseconds, bool bAsync = false,
+ AsyncCallbackWrap* pcall = NULL);
+ virtual ~ResponseFuture();
+ void releaseThreadCondition();
+ RemotingCommand* waitResponse(int timeoutMillis);
+ RemotingCommand* getCommand() const;
+
+ void setResponse(RemotingCommand* pResponseCommand);
+ bool isSendRequestOK();
+ void setSendRequestOK(bool sendRequestOK);
+ int getRequestCode() const;
+ int getOpaque() const;
+
+ //<!callback;
+ void executeInvokeCallback();
+ void executeInvokeCallbackException();
+ bool isTimeOut() const;
+ // bool isTimeOutMoreThan30s() const;
+ const bool getASyncFlag();
+ void setAsyncResponseFlag();
+ const bool getAsyncResponseFlag();
+ const bool getSyncResponseFlag();
+ AsyncCallbackWrap* getAsyncCallbackWrap();
+ void setAsyncCallBackStatus(asyncCallBackStatus asyncCallbackStatus);
+
+ private:
+ int m_requestCode;
+ int m_opaque;
+ bool m_sendRequestOK;
+ boost::mutex m_defaultEventLock;
+ boost::condition_variable_any m_defaultEvent;
+ int64 m_beginTimestamp;
+ int64 m_timeout; // ms
+ boost::atomic<bool> m_bAsync;
+ RemotingCommand* m_pResponseCommand; //<!delete outside;
+ AsyncCallbackWrap* m_pCallbackWrap;
+ boost::mutex m_asyncCallbackLock;
+ asyncCallBackStatus m_asyncCallbackStatus;
+ boost::atomic<bool> m_asyncResponse;
+ boost::atomic<bool> m_syncResponse;
+ // TcpRemotingClient* m_tcpRemoteClient;
+};
+//<!************************************************************************
+} //<!end namespace;
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/SocketUtil.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/SocketUtil.cpp b/rocketmq-cpp/src/transport/SocketUtil.cpp
new file mode 100755
index 0000000..d428f24
--- /dev/null
+++ b/rocketmq-cpp/src/transport/SocketUtil.cpp
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "SocketUtil.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+sockaddr IPPort2socketAddress(int host, int port) {
+ struct sockaddr_in sa;
+ sa.sin_family = AF_INET;
+ sa.sin_port = htons((uint16)port);
+ sa.sin_addr.s_addr = htonl(host);
+
+ sockaddr bornAddr;
+ memcpy(&bornAddr, &sa, sizeof(sockaddr));
+ return bornAddr;
+}
+
+string socketAddress2IPPort(sockaddr addr) {
+ sockaddr_in sa;
+ memcpy(&sa, &addr, sizeof(sockaddr));
+
+ char tmp[32];
+ sprintf(tmp, "%s:%d", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
+
+ string ipport = tmp;
+ return ipport;
+}
+
+void socketAddress2IPPort(sockaddr addr, int& host, int& port) {
+ struct sockaddr_in sa;
+ memcpy(&sa, &addr, sizeof(sockaddr));
+
+ host = ntohl(sa.sin_addr.s_addr);
+ port = ntohs(sa.sin_port);
+}
+
+string socketAddress2String(sockaddr addr) {
+ sockaddr_in in;
+ memcpy(&in, &addr, sizeof(sockaddr));
+
+ return inet_ntoa(in.sin_addr);
+}
+
+string getHostName(sockaddr addr) {
+ sockaddr_in in;
+ memcpy(&in, &addr, sizeof(sockaddr));
+
+ struct hostent* remoteHost = gethostbyaddr((char*)&(in.sin_addr), 4, AF_INET);
+ char** alias = remoteHost->h_aliases;
+ if (*alias != 0) {
+ return *alias;
+ } else {
+ return inet_ntoa(in.sin_addr);
+ }
+}
+
+uint64 swapll(uint64 v) {
+#ifdef ENDIANMODE_BIG
+ return v;
+#else
+ uint64 ret = ((v << 56) | ((v & 0xff00) << 40) | ((v & 0xff0000) << 24) |
+ ((v & 0xff000000) << 8) | ((v >> 8) & 0xff000000) |
+ ((v >> 24) & 0xff0000) | ((v >> 40) & 0xff00) | (v >> 56));
+
+ return ret;
+#endif
+}
+
+uint64 h2nll(uint64 v) { return swapll(v); }
+
+uint64 n2hll(uint64 v) { return swapll(v); }
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/SocketUtil.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/SocketUtil.h b/rocketmq-cpp/src/transport/SocketUtil.h
new file mode 100755
index 0000000..7cbba0c
--- /dev/null
+++ b/rocketmq-cpp/src/transport/SocketUtil.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SOCKETUTIL_H__
+#define __SOCKETUTIL_H__
+
+#ifdef WIN32
+#include <WS2tcpip.h>
+#include <Windows.h>
+#include <Winsock2.h>
+#pragma comment(lib, "ws2_32.lib")
+#else
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <signal.h>
+#include <sys/ioctl.h>
+#include <sys/select.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+#endif
+#include <sys/socket.h>
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!************************************************************************
+/**
+* IP:PORT
+*/
+sockaddr IPPort2socketAddress(int host, int port);
+string socketAddress2IPPort(sockaddr addr);
+void socketAddress2IPPort(sockaddr addr, int& host, int& port);
+
+string socketAddress2String(sockaddr addr);
+string getHostName(sockaddr addr);
+
+uint64 h2nll(uint64 v);
+uint64 n2hll(uint64 v);
+
+//<!************************************************************************
+} //<!end namespace;
+
+#endif