You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/09/04 05:04:48 UTC

[02/14] incubator-rocketmq-externals git commit: upload rocketmq-cpp code

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/sequencer.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/sequencer.h b/rocketmq-cpp/src/thread/disruptor/sequencer.h
new file mode 100755
index 0000000..98d617f
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/sequencer.h
@@ -0,0 +1,190 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//     * Redistributions of source code must retain the above copyright
+//       notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above copyright
+//       notice, this list of conditions and the following disclaimer in the
+//       documentation and/or other materials provided with the distribution.
+//     * Neither the name of the disruptor-- nor the
+//       names of its contributors may be used to endorse or promote products
+//       derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_SEQUENCER_H_ // NOLINT
+#define DISRUPTOR_SEQUENCER_H_ // NOLINT
+
+#include <vector>
+
+#include "batch_descriptor.h"
+#include "claim_strategy.h"
+#include "interface.h"
+#include "sequence_barrier.h"
+#include "wait_strategy.h"
+
+namespace rocketmq {
+
+// Coordinator for claiming sequences for access to a data structures while
+// tracking dependent {@link Sequence}s
+class Sequencer: public boost::noncopyable {
+ public:
+    // Construct a Sequencer with the selected strategies.
+    //
+    // @param buffer_size over which sequences are valid.
+    // @param claim_strategy_option for those claiming sequences.
+    // @param wait_strategy_option for those waiting on sequences.
+    Sequencer(int buffer_size,
+              ClaimStrategyOption claim_strategy_option,
+              WaitStrategyOption wait_strategy_option) :
+            buffer_size_(buffer_size),
+            claim_strategy_(CreateClaimStrategy(claim_strategy_option,
+                                                buffer_size_)),
+            wait_strategy_(CreateWaitStrategy(wait_strategy_option)) { }
+
+    ~Sequencer() {
+        delete claim_strategy_;
+        delete wait_strategy_;
+    }
+
+    // Set the sequences that will gate publishers to prevent the buffer
+    // wrapping.
+    //
+    // @param sequences to be gated on.
+    void set_gating_sequences(
+            const std::vector<Sequence*>& sequences) {
+        gating_sequences_ = sequences;
+    }
+
+    // Create a {@link SequenceBarrier} that gates on the cursor and a list of
+    // {@link Sequence}s.
+    //
+    // @param sequences_to_track this barrier will track.
+    // @return the barrier gated as required.
+    ProcessingSequenceBarrier* NewBarrier(
+            const std::vector<Sequence*>& sequences_to_track) {
+        return new ProcessingSequenceBarrier(wait_strategy_, &cursor_,
+                                             sequences_to_track);
+    }
+
+    // Create a new {@link BatchDescriptor} that is the minimum of the
+    // requested size and the buffer_size.
+    //
+    // @param size for the new batch.
+    // @return the new {@link BatchDescriptor}.
+    BatchDescriptor* NewBatchDescriptor(const int& size) {
+        return new BatchDescriptor(size<buffer_size_?size:buffer_size_);
+    }
+
+    // The capacity of the data structure to hold entries.
+    //
+    // @return capacity of the data structure.
+    int buffer_size() { return buffer_size_; }
+
+
+    // Get the value of the cursor indicating the published sequence.
+    //
+    // @return value of the cursor for events that have been published.
+    int64_t GetCursor() { return cursor_.sequence(); }
+
+    // Has the buffer capacity left to allocate another sequence. This is a
+    // concurrent method so the response should only be taken as an indication
+    // of available capacity.
+    //
+    // @return true if the buffer has the capacity to allocated another event.
+    bool HasAvalaibleCapacity() {
+        return claim_strategy_->HasAvalaibleCapacity(gating_sequences_);
+    }
+
+    // Claim the next event in sequence for publishing to the {@link RingBuffer}.
+    //
+    // @return the claimed sequence.
+    int64_t Next() {
+        return claim_strategy_->IncrementAndGet(gating_sequences_);
+    }
+
+    // Claim the next batch of sequence numbers for publishing.
+    //
+    // @param batch_descriptor to be updated for the batch range.
+    // @return the updated batch_descriptor.
+    BatchDescriptor* Next(BatchDescriptor* batch_descriptor) {
+        int64_t sequence = claim_strategy_->IncrementAndGet(batch_descriptor->size(), gating_sequences_);
+        batch_descriptor->set_end(sequence);
+        return batch_descriptor;
+    }
+
+    // Claim a specific sequence when only one publisher is involved.
+    //
+    // @param sequence to be claimed.
+    // @return sequence just claime.
+    int64_t Claim(const int64_t& sequence) {
+        claim_strategy_->SetSequence(sequence, gating_sequences_);
+        return sequence;
+    }
+
+    // Publish an event and make it visible to {@link EventProcessor}s.
+    //
+    // @param sequence to be published.
+    void Publish(const int64_t& sequence) {
+        Publish(sequence, 1);
+    }
+
+    // Publish the batch of events in sequence.
+    //
+    // @param sequence to be published.
+    void Publish(const BatchDescriptor& batch_descriptor) {
+        Publish(batch_descriptor.end(), batch_descriptor.size());
+    }
+
+    // Force the publication of a cursor sequence.
+    //
+    // Only use this method when forcing a sequence and you are sure only one
+    // publisher exists. This will cause the cursor to advance to this
+    // sequence.
+    //
+    // @param sequence to which is to be forced for publication.
+    void ForcePublish(const int64_t& sequence) {
+        cursor_.set_sequence(sequence);
+        wait_strategy_->SignalAllWhenBlocking();
+    }
+
+    // TODO(fsaintjacques): This was added to overcome
+    // NoOpEventProcessor::GetSequence(), this is not a clean solution.
+    Sequence* GetSequencePtr() {
+        return &cursor_;
+    }
+
+ private:
+    // Helpers
+    void Publish(const int64_t& sequence, const int64_t& batch_size) {
+        //LOG_DEBUG("publish sequence:%d", sequence);
+        claim_strategy_->SerialisePublishing(sequence, cursor_, batch_size);
+        cursor_.set_sequence(sequence);
+        wait_strategy_->SignalAllWhenBlocking();
+    }
+
+    // Members
+    const int buffer_size_;
+
+    PaddedSequence cursor_;
+    std::vector<Sequence*> gating_sequences_;
+
+    ClaimStrategyInterface* claim_strategy_;
+    WaitStrategyInterface* wait_strategy_;
+
+};
+
+};  // namespace rocketmq
+
+#endif // DISRUPTOR_RING_BUFFER_H_ NOLINT

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/utils.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/utils.h b/rocketmq-cpp/src/thread/disruptor/utils.h
new file mode 100755
index 0000000..0730093
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/utils.h
@@ -0,0 +1,35 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//     * Redistributions of source code must retain the above copyright
+//       notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above copyright
+//       notice, this list of conditions and the following disclaimer in the
+//       documentation and/or other materials provided with the distribution.
+//     * Neither the name of the disruptor-- nor the
+//       names of its contributors may be used to endorse or promote products
+//       derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_UTILS_H_ // NOLINT
+#define DISRUPTOR_UTILS_H_ // NOLINT
+
+// From Google C++ Standard, modified to use C++11 deleted functions.
+// A macro to disallow the copy constructor and operator= functions.
+#define DISALLOW_COPY_AND_ASSIGN(TypeName) \
+  TypeName(const TypeName&)  delete      \
+  void operator=(const TypeName&) delete;
+
+#endif // DISRUPTOR_UTILS_H_ NOLINT

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptor/wait_strategy.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptor/wait_strategy.h b/rocketmq-cpp/src/thread/disruptor/wait_strategy.h
new file mode 100755
index 0000000..fb2e58a
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptor/wait_strategy.h
@@ -0,0 +1,377 @@
+// Copyright (c) 2011, François Saint-Jacques
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are met:
+//     * Redistributions of source code must retain the above copyright
+//       notice, this list of conditions and the following disclaimer.
+//     * Redistributions in binary form must reproduce the above copyright
+//       notice, this list of conditions and the following disclaimer in the
+//       documentation and/or other materials provided with the distribution.
+//     * Neither the name of the disruptor-- nor the
+//       names of its contributors may be used to endorse or promote products
+//       derived from this software without specific prior written permission.
+//
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+// DISCLAIMED. IN NO EVENT SHALL FRANÇOIS SAINT-JACQUES BE LIABLE FOR ANY
+// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+#ifndef DISRUPTOR_WAITSTRATEGY_H_  // NOLINT
+#define DISRUPTOR_WAITSTRATEGY_H_  // NOLINT
+
+#include <sys/time.h>
+
+#include <boost/chrono.hpp>
+#include <boost/thread.hpp>
+#include <vector>
+
+#include "exceptions.h"
+#include "interface.h"
+#include "sequence.h"
+
+namespace rocketmq {
+
+// Strategy options which are available to those waiting on a
+// {@link RingBuffer}
+enum WaitStrategyOption {
+    // This strategy uses a condition variable inside a lock to block the
+    // event procesor which saves CPU resource at the expense of lock
+    // contention.
+    kBlockingStrategy,
+    // This strategy uses a progressive back off strategy by first spinning,
+    // then yielding, then sleeping for 1ms period. This is a good strategy
+    // for burst traffic then quiet periods when latency is not critical.
+    kSleepingStrategy,
+    // This strategy calls Thread.yield() in a loop as a waiting strategy
+    // which reduces contention at the expense of CPU resource.
+    kYieldingStrategy,
+    // This strategy call spins in a loop as a waiting strategy which is
+    // lowest and most consistent latency but ties up a CPU.
+    kBusySpinStrategy
+};
+
+// Blocking strategy that uses a lock and condition variable for
+// {@link Consumer}s waiting on a barrier.
+// This strategy should be used when performance and low-latency are not as
+// important as CPU resource.
+class BlockingStrategy :  public WaitStrategyInterface {
+ public:
+    BlockingStrategy() {}
+
+    virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+                            const Sequence& cursor,
+                            const SequenceBarrierInterface& barrier,
+                            const int64_t& sequence) {
+        int64_t available_sequence = 0;
+        // We need to wait.
+        if ((available_sequence = cursor.sequence()) < sequence) {
+            // acquire lock
+            boost::unique_lock<boost::recursive_mutex> ulock(mutex_);
+            while ((available_sequence = cursor.sequence()) < sequence) {
+                barrier.CheckAlert();
+                consumer_notify_condition_.wait(ulock);
+            }
+        } // unlock happens here, on ulock destruction.
+
+        if (0 != dependents.size()) {
+            while ((available_sequence = GetMinimumSequence(dependents)) < \
+                    sequence) {
+                barrier.CheckAlert();
+            }
+        }
+
+        return available_sequence;
+    }
+
+    virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+                            const Sequence& cursor,
+                            const SequenceBarrierInterface& barrier,
+                            const int64_t& sequence,
+                            const int64_t& timeout_micros) {
+        int64_t available_sequence = 0;
+        // We have to wait
+        if ((available_sequence = cursor.sequence()) < sequence) {
+            boost::unique_lock<boost::recursive_mutex> ulock(mutex_);
+            while ((available_sequence = cursor.sequence()) < sequence) {
+                barrier.CheckAlert();
+                if (boost::cv_status::timeout == consumer_notify_condition_.wait_for(ulock,
+                    boost::chrono::microseconds(timeout_micros)))
+                    break;
+
+            }
+        } // unlock happens here, on ulock destruction
+
+        if (0 != dependents.size()) {
+            while ((available_sequence = GetMinimumSequence(dependents)) \
+                    < sequence) {
+                barrier.CheckAlert();
+            }
+        }
+
+        return available_sequence;
+    }
+
+    virtual void SignalAllWhenBlocking() {
+        boost::unique_lock<boost::recursive_mutex> ulock(mutex_);
+        consumer_notify_condition_.notify_all();
+    }
+
+ private:
+    boost::recursive_mutex mutex_;
+    boost::condition_variable_any consumer_notify_condition_;
+
+};
+
+// Sleeping strategy
+class SleepingStrategy :  public WaitStrategyInterface {
+ public:
+    SleepingStrategy() {}
+
+    virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+                            const Sequence& cursor,
+                            const SequenceBarrierInterface& barrier,
+                            const int64_t& sequence) {
+        int64_t available_sequence = 0;
+        int counter = kRetries;
+
+        if (0 == dependents.size()) {
+            while ((available_sequence = cursor.sequence()) < sequence) {
+                counter = ApplyWaitMethod(barrier, counter);
+            }
+        } else {
+            while ((available_sequence = GetMinimumSequence(dependents)) < \
+                    sequence) {
+                counter = ApplyWaitMethod(barrier, counter);
+            }
+        }
+
+        return available_sequence;
+    }
+
+    virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+                            const Sequence& cursor,
+                            const SequenceBarrierInterface& barrier,
+                            const int64_t& sequence,
+                            const int64_t& timeout_micros) {
+        // timing
+        struct timeval start_time, end_time;
+        gettimeofday(&start_time, NULL);
+        int64_t start_micro = start_time.tv_sec*1000000 + start_time.tv_usec;
+
+        int64_t available_sequence = 0;
+        int counter = kRetries;
+
+        if (0 == dependents.size()) {
+            while ((available_sequence = cursor.sequence()) < sequence) {
+                counter = ApplyWaitMethod(barrier, counter);
+                gettimeofday(&end_time, NULL);
+                int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec;
+                if (timeout_micros < (end_micro - start_micro))
+                    break;
+            }
+        } else {
+            while ((available_sequence = GetMinimumSequence(dependents)) < \
+                    sequence) {
+                counter = ApplyWaitMethod(barrier, counter);
+                gettimeofday(&end_time, NULL);
+                int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec;
+                if (timeout_micros < (end_micro - start_micro))
+                    break;
+            }
+        }
+
+        return available_sequence;
+    }
+
+    virtual void SignalAllWhenBlocking() {}
+
+    static const int kRetries = 200;
+
+ private:
+    int ApplyWaitMethod(const SequenceBarrierInterface& barrier, int counter) {
+        barrier.CheckAlert();
+        if (counter > 100) {
+            counter--;
+        } else if (counter > 0) {
+            counter--;
+            boost::this_thread::yield();
+        } else {
+            boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
+        }
+
+        return counter;
+    }
+
+};
+
+// Yielding strategy that uses a sleep(0) for {@link EventProcessor}s waiting
+// on a barrier. This strategy is a good compromise between performance and
+// CPU resource.
+class YieldingStrategy :  public WaitStrategyInterface {
+ public:
+    YieldingStrategy() {}
+
+    virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+                            const Sequence& cursor,
+                            const SequenceBarrierInterface& barrier,
+                            const int64_t& sequence) {
+        int64_t available_sequence = 0;
+        int counter = kSpinTries;
+
+        if (0 == dependents.size()) {
+            while ((available_sequence = cursor.sequence()) < sequence) {
+                counter = ApplyWaitMethod(barrier, counter);
+            }
+        } else {
+            while ((available_sequence = GetMinimumSequence(dependents)) < \
+                    sequence) {
+                counter = ApplyWaitMethod(barrier, counter);
+            }
+        }
+
+        return available_sequence;
+    }
+
+    virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+                            const Sequence& cursor,
+                            const SequenceBarrierInterface& barrier,
+                            const int64_t& sequence,
+                            const int64_t& timeout_micros) {
+        struct timeval start_time, end_time;
+        gettimeofday(&start_time, NULL);
+        int64_t start_micro = start_time.tv_sec*1000000 + start_time.tv_usec;
+
+        int64_t available_sequence = 0;
+        int counter = kSpinTries;
+
+        if (0 == dependents.size()) {
+            while ((available_sequence = cursor.sequence()) < sequence) {
+                counter = ApplyWaitMethod(barrier, counter);
+                gettimeofday(&end_time, NULL);
+                int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec;
+                if (timeout_micros < (end_micro - start_micro))
+                    break;
+            }
+        } else {
+            while ((available_sequence = GetMinimumSequence(dependents)) < \
+                    sequence) {
+                counter = ApplyWaitMethod(barrier, counter);
+                gettimeofday(&end_time, NULL);
+                int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec;
+                if (timeout_micros < (end_micro - start_micro))
+                    break;
+            }
+        }
+
+        return available_sequence;
+    }
+
+    virtual void SignalAllWhenBlocking() {}
+
+    static const int kSpinTries = 100;
+
+ private:
+    int ApplyWaitMethod(const SequenceBarrierInterface& barrier, int counter) {
+        barrier.CheckAlert();
+        if (counter == 0) {
+            boost::this_thread::yield();
+        } else {
+            counter--;
+        }
+
+        return counter;
+    }
+
+};
+
+
+// Busy Spin strategy that uses a busy spin loop for {@link EventProcessor}s
+// waiting on a barrier.
+// This strategy will use CPU resource to avoid syscalls which can introduce
+// latency jitter.  It is best used when threads can be bound to specific
+// CPU cores.
+class BusySpinStrategy :  public WaitStrategyInterface {
+ public:
+    BusySpinStrategy() {}
+
+    virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+                            const Sequence& cursor,
+                            const SequenceBarrierInterface& barrier,
+                            const int64_t& sequence) {
+        int64_t available_sequence = 0;
+        if (0 == dependents.size()) {
+            while ((available_sequence = cursor.sequence()) < sequence) {
+                barrier.CheckAlert();
+            }
+        } else {
+            while ((available_sequence = GetMinimumSequence(dependents)) < \
+                    sequence) {
+                barrier.CheckAlert();
+            }
+        }
+
+        return available_sequence;
+    }
+
+    virtual int64_t WaitFor(const std::vector<Sequence*>& dependents,
+                            const Sequence& cursor,
+                            const SequenceBarrierInterface& barrier,
+                            const int64_t& sequence,
+                            const int64_t& timeout_micros) {
+        struct timeval start_time, end_time;
+        gettimeofday(&start_time, NULL);
+        int64_t start_micro = start_time.tv_sec*1000000 + start_time.tv_usec;
+        int64_t available_sequence = 0;
+
+        if (0 == dependents.size()) {
+            while ((available_sequence = cursor.sequence()) < sequence) {
+                barrier.CheckAlert();
+                gettimeofday(&end_time, NULL);
+                int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec;
+                if (timeout_micros < (end_micro - start_micro))
+                    break;
+            }
+        } else {
+            while ((available_sequence = GetMinimumSequence(dependents)) < \
+                    sequence) {
+                barrier.CheckAlert();
+                gettimeofday(&end_time, NULL);
+                int64_t end_micro = end_time.tv_sec*1000000 + end_time.tv_usec;
+                if (timeout_micros < (end_micro - start_micro))
+                    break;
+            }
+        }
+
+        return available_sequence;
+    }
+
+    virtual void SignalAllWhenBlocking() {}
+
+};
+
+WaitStrategyInterface* CreateWaitStrategy(WaitStrategyOption wait_option) {
+    switch (wait_option) {
+        case kBlockingStrategy:
+            return new BlockingStrategy();
+        case kSleepingStrategy:
+            return new SleepingStrategy();
+        case kYieldingStrategy:
+            return new YieldingStrategy();
+        case kBusySpinStrategy:
+            return new BusySpinStrategy();
+        default:
+            return NULL;
+    }
+}
+
+
+};  // namespace rocketmq
+
+#endif // DISRUPTOR_WAITSTRATEGY_H_  NOLINT

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/disruptorLFQ.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/disruptorLFQ.h b/rocketmq-cpp/src/thread/disruptorLFQ.h
new file mode 100644
index 0000000..a05b0cd
--- /dev/null
+++ b/rocketmq-cpp/src/thread/disruptorLFQ.h
@@ -0,0 +1,113 @@
+#ifndef _DISRUPTORLFQ_
+#define _DISRUPTORLFQ_
+
+#include <disruptor/event_processor.h>
+#include <disruptor/event_publisher.h>
+#include <disruptor/exception_handler.h>
+#include <disruptor/interface.h>
+
+#include <boost/asio/io_service.hpp>
+#include <boost/bind.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/thread.hpp>
+
+namespace rocketmq {
+class Task;
+class taskEventFactory : public EventFactoryInterface<Task> {
+ public:
+  virtual Task* NewInstance(const int& size) const;
+};
+
+class taskBatchHandler : public EventHandlerInterface<Task> {
+ public:
+  taskBatchHandler(int pullMsgThreadPoolNum);
+  virtual ~taskBatchHandler() {}
+
+  virtual void OnEvent(const int64_t& sequence, const bool& end_of_batch,
+                       Task* event);
+  virtual void OnStart() {}
+  virtual void OnShutdown() {}
+  void runTaskEvent(Task event, int64_t sequence);
+  void stopIOService();
+
+ private:
+  boost::asio::io_service m_ioService;
+  boost::thread_group m_threadpool;
+  boost::asio::io_service::work m_ioServiceWork;
+};
+
+class taskEventTranslator : public EventTranslatorInterface<Task> {
+ public:
+  taskEventTranslator(Task* event);
+  virtual ~taskEventTranslator() {}
+  virtual Task* TranslateTo(const int64_t& sequence, Task* event);
+
+ private:
+  Task* m_taskEvent;
+};
+
+class taskExceptionHandler : public ExceptionHandlerInterface<Task> {
+ public:
+  virtual void Handle(const std::exception& exception, const int64_t& sequence,
+                      Task* event) {}
+};
+
+class disruptorLFQ {
+ public:
+  disruptorLFQ(int threadCount) {
+    m_task_factory.reset(new taskEventFactory());
+    m_ring_buffer.reset(new RingBuffer<Task>(
+        m_task_factory.get(),
+        1024,  // default size is 1024, must be n power of 2
+        kSingleThreadedStrategy,
+        // metaq::kBusySpinStrategy);//load normal, high cpu occupy, and
+        // smallest consume latency
+        // metaq::kYieldingStrategy); //load normal, high cpu occupy, and
+        // smaller consume latency
+        // metaq::kSleepingStrategy);//load normal, lowest cpu occupy, but
+        // largest consume latency
+        kBlockingStrategy));  // load normal, lowest CPU occupy, but
+                                     // largest consume latency
+
+    m_sequence_to_track.reset(new std::vector<Sequence*>(0));
+    m_sequenceBarrier.reset(
+        m_ring_buffer->NewBarrier(*(m_sequence_to_track.get())));
+
+    m_task_handler.reset(new taskBatchHandler(threadCount));
+    m_task_exception_handler.reset(new taskExceptionHandler());
+    m_processor.reset(new BatchEventProcessor<Task>(
+        m_ring_buffer.get(),
+        (SequenceBarrierInterface*)m_sequenceBarrier.get(),
+        m_task_handler.get(), m_task_exception_handler.get()));
+
+    /*
+    Publisher will try to publish BUFFER_SIZE + 1 events.
+    The last event should wait for at least one consume before publishing, thus
+    preventing an overwrite.
+    After the single consume, the publisher should resume and publish the last
+    event.
+    */
+    m_gating_sequences.push_back(m_processor.get()->GetSequence());
+    m_ring_buffer->set_gating_sequences(
+        m_gating_sequences);  // prevent overlap, publishEvent will be blocked
+                              // on ring_buffer_->Next();
+
+    m_publisher.reset(new EventPublisher<Task>(m_ring_buffer.get()));
+  }
+  virtual ~disruptorLFQ() {}
+
+ public:
+  boost::scoped_ptr<taskEventFactory> m_task_factory;
+  boost::scoped_ptr<taskBatchHandler> m_task_handler;
+  boost::scoped_ptr<taskExceptionHandler> m_task_exception_handler;
+  boost::scoped_ptr<std::vector<Sequence*>> m_sequence_to_track;
+  boost::scoped_ptr<RingBuffer<Task>> m_ring_buffer;
+  boost::scoped_ptr<ProcessingSequenceBarrier> m_sequenceBarrier;
+  boost::scoped_ptr<BatchEventProcessor<Task>> m_processor;
+  boost::scoped_ptr<EventPublisher<Task>> m_publisher;
+  std::vector<Sequence*> m_gating_sequences;
+};
+}
+//<!***************************************************************************
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/task_queue.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/task_queue.cpp b/rocketmq-cpp/src/thread/task_queue.cpp
new file mode 100755
index 0000000..510425c
--- /dev/null
+++ b/rocketmq-cpp/src/thread/task_queue.cpp
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "task_queue.h"
+#include <sys/prctl.h>
+#include "UtilAll.h"
+#include "disruptorLFQ.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+Task* taskEventFactory::NewInstance(const int& size) const {
+  return new Task[size];
+}
+
+taskBatchHandler::taskBatchHandler(int pullMsgThreadPoolNum)
+    : m_ioServiceWork(m_ioService) {
+  string taskName = UtilAll::getProcessName();
+  prctl(PR_SET_NAME, "PullMsgTP", 0, 0, 0);
+  for (int i = 0; i != pullMsgThreadPoolNum; ++i) {
+    m_threadpool.create_thread(
+        boost::bind(&boost::asio::io_service::run, &m_ioService));
+  }
+  prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
+}
+
+void taskBatchHandler::OnEvent(const int64_t& sequence,
+                               const bool& end_of_batch, Task* event) {
+   //cp Task event out, avoid publish event override current Task event                               
+  Task currentTask(*event);
+  m_ioService.post(boost::bind(&taskBatchHandler::runTaskEvent, this,
+                               currentTask, sequence));
+}
+
+void taskBatchHandler::runTaskEvent(Task event, int64_t sequence) {
+  // LOG_INFO("processor event sequence:%lld",  sequence);
+  event.run();
+}
+
+void taskBatchHandler::stopIOService() {
+  m_ioService.stop();
+  m_threadpool.join_all();
+}
+
+taskEventTranslator::taskEventTranslator(Task* event) : m_taskEvent(event) {}
+
+Task* taskEventTranslator::TranslateTo(const int64_t& sequence, Task* event) {
+  // LOG_INFO("publish sequence:%lld, event:%x", sequence, event);
+  *event = *m_taskEvent;
+  return event;
+};
+
+//******************************************************************************************8
+TaskQueue::TaskQueue(int threadCount) {
+  m_flag.store(true, boost::memory_order_release);
+  m_disruptorLFQ = new disruptorLFQ(threadCount);
+}
+
+TaskQueue::~TaskQueue() {
+  delete m_disruptorLFQ;
+  m_disruptorLFQ = NULL;
+}
+
+void TaskQueue::close() {
+  m_flag.store(false, boost::memory_order_release);
+  m_disruptorLFQ->m_task_handler->stopIOService();
+  m_disruptorLFQ->m_processor->Halt();
+}
+
+bool TaskQueue::bTaskQueueStatusOK() {
+  return m_flag.load(boost::memory_order_acquire) == true;
+}
+
+void TaskQueue::produce(const Task& task) {
+  boost::mutex::scoped_lock lock(m_publishLock);
+  taskEventTranslator pTranslator(const_cast<Task*>(&task));
+  m_disruptorLFQ->m_publisher->PublishEvent(&pTranslator);
+}
+
+int TaskQueue::run() {
+  while (true) {
+    m_disruptorLFQ->m_processor->Run();
+    if (m_flag.load(boost::memory_order_acquire) == false) {
+      break;
+    }
+  }
+  return 0;
+}
+
+//<!***************************************************************************
+}  //<! end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/thread/task_queue.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/thread/task_queue.h b/rocketmq-cpp/src/thread/task_queue.h
new file mode 100755
index 0000000..e60607c
--- /dev/null
+++ b/rocketmq-cpp/src/thread/task_queue.h
@@ -0,0 +1,679 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *//********************************************************************
+author:  qiwei.wqw@alibaba-inc.com
+*********************************************************************/
+#ifndef _TASK_QUEUE_I_
+#define _TASK_QUEUE_I_
+
+#include <boost/atomic.hpp>
+#include <boost/thread/mutex.hpp>
+#include <list>
+#include <vector>
+using namespace std;
+
+namespace rocketmq {
+
+//<!***************************************************************************
+typedef void (*taskfunc)(void*);
+
+//<! 数据加操作的集合
+class ITask_impl {
+ public:
+  virtual ~ITask_impl() {}
+  virtual void run() = 0;
+  virtual ITask_impl* fork() = 0;
+};
+
+//<!***************************************************************************
+class Task_impl : public ITask_impl {
+ public:
+  Task_impl(taskfunc func, void* arg_) : m_func(func), m_arg(arg_) {}
+  virtual ~Task_impl() {
+    m_func = 0;
+    m_arg = 0;
+  }
+  virtual void run() {
+    if (m_func != 0) m_func(m_arg);
+  }
+  virtual ITask_impl* fork() { return new Task_impl(m_func, m_arg); }
+
+ protected:
+  taskfunc m_func;
+  void* m_arg;
+};
+
+//<!***************************************************************************
+//<! 构造ITask_impl的子类对象时,为其赋予不同的数据和操作即可。
+//<! 这里使用了组合的方式实现了接口和实现的分离;
+//<!***************************************************************************
+struct Task {
+  static void dumy(void*) {}
+
+  Task(taskfunc f_, void* d_) { m_pTaskImpl = new Task_impl(f_, d_); }
+  Task(ITask_impl* task_imp_) : m_pTaskImpl(task_imp_) {}
+  Task(const Task& src_) : m_pTaskImpl(src_.m_pTaskImpl->fork()) {}
+  Task() { m_pTaskImpl = new Task_impl(&Task::dumy, 0); }
+  virtual ~Task() { delete m_pTaskImpl; }
+  Task& operator=(const Task& src_) {
+    delete m_pTaskImpl;
+    m_pTaskImpl = src_.m_pTaskImpl->fork();
+    return *this;
+  }
+  void run() {
+    if (m_pTaskImpl) m_pTaskImpl->run();
+  }
+
+ private:
+  ITask_impl* m_pTaskImpl;
+};
+
+//<!***************************************************************************
+class ITaskQueue {
+ public:
+  typedef list<Task> TaskList;
+
+ public:
+  virtual ~ITaskQueue() {}
+  virtual void close() = 0;
+  virtual void produce(const Task& task) = 0;
+  // virtual void multi_produce(const TaskList& tasks) = 0;
+  // virtual int  consume(Task& task)                  = 0;
+  // virtual int  consume_all(TaskList& tasks)         = 0;
+  virtual int run() = 0;
+  // virtual int  batch_run()                          = 0;
+  virtual bool bTaskQueueStatusOK() = 0;
+};
+
+//<!***************************************************************************
+//<! 由于不同的操作和数据可能需要构造不同ITask_impl子类,
+//<!
+//我们需要提供一些泛型函数,能够将用户的所有操作和数据都能轻易的转换成Task对象。
+//<! TaskBinder 提供一系列的gen函数,能够转换用户的普通函数和数据为Task对象;
+//<!***************************************************************************
+struct TaskBinder {
+  static Task gen(void (*func)(void*), void* p_) { return Task(func, p_); }
+
+  template <typename RET>
+  static Task gen(RET (*func)(void)) {
+    struct lambda {
+      static void taskfunc(void* p_) { (*(RET(*)(void))p_)(); };
+    };
+    return Task(lambda::taskfunc, (void*)func);
+  }
+
+  template <typename FUNCT, typename ARG1>
+  static Task gen(FUNCT func, ARG1 arg1) {
+    struct lambda : public ITask_impl {
+      FUNCT dest_func;
+      ARG1 arg1;
+      lambda(FUNCT func, const ARG1& arg1) : dest_func(func), arg1(arg1) {}
+      virtual void run() { (*dest_func)(arg1); }
+      virtual ITask_impl* fork() { return new lambda(dest_func, arg1); }
+    };
+    return Task(new lambda(func, arg1));
+  }
+
+  template <typename FUNCT, typename ARG1, typename ARG2>
+  static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2) {
+    struct lambda : public ITask_impl {
+      FUNCT dest_func;
+      ARG1 arg1;
+      ARG2 arg2;
+      lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2)
+          : dest_func(func), arg1(arg1), arg2(arg2) {}
+      virtual void run() { (*dest_func)(arg1, arg2); }
+      virtual ITask_impl* fork() { return new lambda(dest_func, arg1, arg2); }
+    };
+    return Task(new lambda(func, arg1, arg2));
+  }
+
+  template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3>
+  static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3) {
+    struct lambda : public ITask_impl {
+      FUNCT dest_func;
+      ARG1 arg1;
+      ARG2 arg2;
+      ARG3 arg3;
+      lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3)
+          : dest_func(func), arg1(arg1), arg2(arg2), arg3(arg3) {}
+      virtual void run() { (*dest_func)(arg1, arg2, arg3); }
+      virtual ITask_impl* fork() {
+        return new lambda(dest_func, arg1, arg2, arg3);
+      }
+    };
+    return Task(new lambda(func, arg1, arg2, arg3));
+  }
+
+  template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3,
+            typename ARG4>
+  static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4) {
+    struct lambda : public ITask_impl {
+      FUNCT dest_func;
+      ARG1 arg1;
+      ARG2 arg2;
+      ARG3 arg3;
+      ARG4 arg4;
+      lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+             const ARG4& arg4)
+          : dest_func(func), arg1(arg1), arg2(arg2), arg3(arg3), arg4(arg4) {}
+      virtual void run() { (*dest_func)(arg1, arg2, arg3, arg4); }
+      virtual ITask_impl* fork() {
+        return new lambda(dest_func, arg1, arg2, arg3, arg4);
+      }
+    };
+    return Task(new lambda(func, arg1, arg2, arg3, arg4));
+  }
+
+  template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3,
+            typename ARG4, typename ARG5>
+  static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4,
+                  ARG5 arg5) {
+    struct lambda : public ITask_impl {
+      FUNCT dest_func;
+      ARG1 arg1;
+      ARG2 arg2;
+      ARG3 arg3;
+      ARG4 arg4;
+      ARG5 arg5;
+      lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+             const ARG4& arg4, const ARG5& arg5)
+          : dest_func(func),
+            arg1(arg1),
+            arg2(arg2),
+            arg3(arg3),
+            arg4(arg4),
+            arg5(arg5) {}
+      virtual void run() { (*dest_func)(arg1, arg2, arg3, arg4, arg5); }
+      virtual ITask_impl* fork() {
+        return new lambda(dest_func, arg1, arg2, arg3, arg4, arg5);
+      }
+    };
+    return Task(new lambda(func, arg1, arg2, arg3, arg4, arg5));
+  }
+
+  template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3,
+            typename ARG4, typename ARG5, typename ARG6>
+  static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4,
+                  ARG5 arg5, ARG6 arg6) {
+    struct lambda : public ITask_impl {
+      FUNCT dest_func;
+      ARG1 arg1;
+      ARG2 arg2;
+      ARG3 arg3;
+      ARG4 arg4;
+      ARG5 arg5;
+      ARG6 arg6;
+      lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+             const ARG4& arg4, const ARG5& arg5, const ARG6& arg6)
+          : dest_func(func),
+            arg1(arg1),
+            arg2(arg2),
+            arg3(arg3),
+            arg4(arg4),
+            arg5(arg5),
+            arg6(arg6) {}
+      virtual void run() { (*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6); }
+      virtual ITask_impl* fork() {
+        return new lambda(dest_func, arg1, arg2, arg3, arg4, arg5, arg6);
+      }
+    };
+    return Task(new lambda(func, arg1, arg2, arg3, arg4, arg5, arg6));
+  }
+
+  template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3,
+            typename ARG4, typename ARG5, typename ARG6, typename ARG7>
+  static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4,
+                  ARG5 arg5, ARG6 arg6, ARG7 arg7) {
+    struct lambda : public ITask_impl {
+      FUNCT dest_func;
+      ARG1 arg1;
+      ARG2 arg2;
+      ARG3 arg3;
+      ARG4 arg4;
+      ARG5 arg5;
+      ARG6 arg6;
+      ARG7 arg7;
+      lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+             const ARG4& arg4, const ARG5& arg5, const ARG6& arg6,
+             const ARG7& arg7)
+          : dest_func(func),
+            arg1(arg1),
+            arg2(arg2),
+            arg3(arg3),
+            arg4(arg4),
+            arg5(arg5),
+            arg6(arg6),
+            arg7(arg7) {}
+      virtual void run() {
+        (*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7);
+      }
+      virtual ITask_impl* fork() {
+        return new lambda(dest_func, arg1, arg2, arg3, arg4, arg5, arg6, arg7);
+      }
+    };
+    return Task(new lambda(func, arg1, arg2, arg3, arg4, arg5, arg6, arg7));
+  }
+
+  template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3,
+            typename ARG4, typename ARG5, typename ARG6, typename ARG7,
+            typename ARG8>
+  static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4,
+                  ARG5 arg5, ARG6 arg6, ARG7 arg7, ARG8 arg8) {
+    struct lambda : public ITask_impl {
+      FUNCT dest_func;
+      ARG1 arg1;
+      ARG2 arg2;
+      ARG3 arg3;
+      ARG4 arg4;
+      ARG5 arg5;
+      ARG6 arg6;
+      ARG7 arg7;
+      ARG8 arg8;
+      lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+             const ARG4& arg4, const ARG5& arg5, const ARG6& arg6,
+             const ARG7& arg7, const ARG8& arg8)
+          : dest_func(func),
+            arg1(arg1),
+            arg2(arg2),
+            arg3(arg3),
+            arg4(arg4),
+            arg5(arg5),
+            arg6(arg6),
+            arg7(arg7),
+            arg8(arg8) {}
+      virtual void run() {
+        (*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8);
+      }
+      virtual ITask_impl* fork() {
+        return new lambda(dest_func, arg1, arg2, arg3, arg4, arg5, arg6, arg7,
+                          arg8);
+      }
+    };
+    return Task(
+        new lambda(func, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8));
+  }
+
+  template <typename FUNCT, typename ARG1, typename ARG2, typename ARG3,
+            typename ARG4, typename ARG5, typename ARG6, typename ARG7,
+            typename ARG8, typename ARG9>
+  static Task gen(FUNCT func, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4,
+                  ARG5 arg5, ARG6 arg6, ARG7 arg7, ARG8 arg8, ARG9 arg9) {
+    struct lambda : public ITask_impl {
+      FUNCT dest_func;
+      ARG1 arg1;
+      ARG2 arg2;
+      ARG3 arg3;
+      ARG4 arg4;
+      ARG5 arg5;
+      ARG6 arg6;
+      ARG7 arg7;
+      ARG8 arg8;
+      ARG9 arg9;
+      lambda(FUNCT func, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+             const ARG4& arg4, const ARG5& arg5, const ARG6& arg6,
+             const ARG7& arg7, const ARG8& arg8, const ARG9& arg9)
+          : dest_func(func),
+            arg1(arg1),
+            arg2(arg2),
+            arg3(arg3),
+            arg4(arg4),
+            arg5(arg5),
+            arg6(arg6),
+            arg7(arg7),
+            arg8(arg8),
+            arg9(arg9) {}
+      virtual void run() {
+        (*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9);
+      }
+      virtual ITask_impl* fork() {
+        return new lambda(dest_func, arg1, arg2, arg3, arg4, arg5, arg6, arg7,
+                          arg8, arg9);
+      }
+    };
+    return Task(
+        new lambda(func, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9));
+  }
+
+  //<!***************************************************************************
+  //<! class fuctions;;
+  //<!***************************************************************************
+  template <typename T, typename RET>
+  static Task gen(RET (T::*func)(void), T* obj) {
+    struct lambda : public ITask_impl {
+      RET (T::*dest_func)(void);
+      T* obj;
+      lambda(RET (T::*func)(void), T* obj) : dest_func(func), obj(obj) {}
+      virtual void run() { (obj->*dest_func)(); }
+      virtual ITask_impl* fork() { return new lambda(dest_func, obj); }
+    };
+    return Task(new lambda(func, obj));
+  }
+
+  template <typename T, typename RET, typename FARG1, typename ARG1>
+  static Task gen(RET (T::*func)(FARG1), T* obj, ARG1 arg1) {
+    struct lambda : public ITask_impl {
+      RET (T::*dest_func)(FARG1);
+      T* obj;
+      ARG1 arg1;
+      lambda(RET (T::*pfunc)(FARG1), T* obj, const ARG1& arg1)
+          : dest_func(pfunc), obj(obj), arg1(arg1) {}
+      virtual void run() { (obj->*dest_func)(arg1); }
+      virtual ITask_impl* fork() { return new lambda(dest_func, obj, arg1); }
+    };
+    return Task(new lambda(func, obj, arg1));
+  }
+
+  template <typename T, typename RET, typename FARG1, typename FARG2,
+            typename ARG1, typename ARG2>
+  static Task gen(RET (T::*func)(FARG1, FARG2), T* obj, ARG1 arg1, ARG2 arg2) {
+    struct lambda : public ITask_impl {
+      RET (T::*dest_func)(FARG1, FARG2);
+      T* obj;
+      ARG1 arg1;
+      ARG2 arg2;
+      lambda(RET (T::*func)(FARG1, FARG2), T* obj, const ARG1& arg1,
+             const ARG2& arg2)
+          : dest_func(func), obj(obj), arg1(arg1), arg2(arg2) {}
+      virtual void run() { (obj->*dest_func)(arg1, arg2); }
+      virtual ITask_impl* fork() {
+        return new lambda(dest_func, obj, arg1, arg2);
+      }
+    };
+    return Task(new lambda(func, obj, arg1, arg2));
+  }
+
+  template <typename T, typename RET, typename FARG1, typename FARG2,
+            typename FARG3, typename ARG1, typename ARG2, typename ARG3>
+  static Task gen(RET (T::*func)(FARG1, FARG2, FARG3), T* obj, ARG1 arg1,
+                  ARG2 arg2, ARG3 arg3) {
+    struct lambda : public ITask_impl {
+      RET (T::*dest_func)(FARG1, FARG2, FARG3);
+      T* obj;
+      ARG1 arg1;
+      ARG2 arg2;
+      ARG3 arg3;
+      lambda(RET (T::*func)(FARG1, FARG2, FARG3), T* obj, const ARG1& arg1,
+             const ARG2& arg2, const ARG3& arg3)
+          : dest_func(func), obj(obj), arg1(arg1), arg2(arg2), arg3(arg3) {}
+      virtual void run() { (obj->*dest_func)(arg1, arg2, arg3); }
+      virtual ITask_impl* fork() {
+        return new lambda(dest_func, obj, arg1, arg2, arg3);
+      }
+    };
+    return Task(new lambda(func, obj, arg1, arg2, arg3));
+  }
+
+  template <typename T, typename RET, typename FARG1, typename FARG2,
+            typename FARG3, typename FARG4, typename ARG1, typename ARG2,
+            typename ARG3, typename ARG4>
+  static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4), T* obj, ARG1 arg1,
+                  ARG2 arg2, ARG3 arg3, ARG4 arg4) {
+    struct lambda : public ITask_impl {
+      RET (T::*dest_func)(FARG1, FARG2, FARG3, FARG4);
+      T* obj;
+      ARG1 arg1;
+      ARG2 arg2;
+      ARG3 arg3;
+      ARG4 arg4;
+      lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4), T* obj,
+             const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+             const ARG4& arg4)
+          : dest_func(func),
+            obj(obj),
+            arg1(arg1),
+            arg2(arg2),
+            arg3(arg3),
+            arg4(arg4) {}
+      virtual void run() { (obj->*dest_func)(arg1, arg2, arg3, arg4); }
+      virtual ITask_impl* fork() {
+        return new lambda(dest_func, obj, arg1, arg2, arg3, arg4);
+      }
+    };
+    return Task(new lambda(func, obj, arg1, arg2, arg3, arg4));
+  }
+
+  template <typename T, typename RET, typename FARG1, typename FARG2,
+            typename FARG3, typename FARG4, typename FARG5, typename ARG1,
+            typename ARG2, typename ARG3, typename ARG4, typename ARG5>
+  static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5), T* obj,
+                  ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, ARG5 arg5) {
+    struct lambda : public ITask_impl {
+      RET (T::*dest_func)(FARG1, FARG2, FARG3, FARG4, FARG5);
+      T* obj;
+      ARG1 arg1;
+      ARG2 arg2;
+      ARG3 arg3;
+      ARG4 arg4;
+      ARG5 arg5;
+      lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5), T* obj,
+             const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+             const ARG4& arg4, const ARG5& arg5)
+          : dest_func(func),
+            obj(obj),
+            arg1(arg1),
+            arg2(arg2),
+            arg3(arg3),
+            arg4(arg4),
+            arg5(arg5) {}
+      virtual void run() { (obj->*dest_func)(arg1, arg2, arg3, arg4, arg5); }
+      virtual ITask_impl* fork() {
+        return new lambda(dest_func, obj, arg1, arg2, arg3, arg4, arg5);
+      }
+    };
+    return Task(new lambda(func, obj, arg1, arg2, arg3, arg4, arg5));
+  }
+
+  template <typename T, typename RET, typename FARG1, typename FARG2,
+            typename FARG3, typename FARG4, typename FARG5, typename FARG6,
+            typename ARG1, typename ARG2, typename ARG3, typename ARG4,
+            typename ARG5, typename ARG6>
+  static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6),
+                  T* obj, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, ARG5 arg5,
+                  ARG6 arg6) {
+    struct lambda : public ITask_impl {
+      RET (T::*dest_func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6);
+      T* obj;
+      ARG1 arg1;
+      ARG2 arg2;
+      ARG3 arg3;
+      ARG4 arg4;
+      ARG5 arg5;
+      ARG6 arg6;
+      lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6), T* obj,
+             const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+             const ARG4& arg4, const ARG5& arg5, const ARG6& arg6)
+          : dest_func(func),
+            obj(obj),
+            arg1(arg1),
+            arg2(arg2),
+            arg3(arg3),
+            arg4(arg4),
+            arg5(arg5),
+            arg6(arg6) {}
+      virtual void run() {
+        (obj->*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6);
+      }
+      virtual ITask_impl* fork() {
+        return new lambda(dest_func, obj, arg1, arg2, arg3, arg4, arg5, arg6);
+      }
+    };
+    return Task(new lambda(func, obj, arg1, arg2, arg3, arg4, arg5, arg6));
+  }
+
+  template <typename T, typename RET, typename FARG1, typename FARG2,
+            typename FARG3, typename FARG4, typename FARG5, typename FARG6,
+            typename FARG7, typename ARG1, typename ARG2, typename ARG3,
+            typename ARG4, typename ARG5, typename ARG6, typename ARG7>
+  static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6,
+                                 FARG7),
+                  T* obj, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, ARG5 arg5,
+                  ARG6 arg6, ARG7 arg7) {
+    struct lambda : public ITask_impl {
+      RET (T::*dest_func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7);
+      T* obj;
+      ARG1 arg1;
+      ARG2 arg2;
+      ARG3 arg3;
+      ARG4 arg4;
+      ARG5 arg5;
+      ARG6 arg6;
+      ARG7 arg7;
+      lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7),
+             T* obj, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+             const ARG4& arg4, const ARG5& arg5, const ARG6& arg6,
+             const ARG7& arg7)
+          : dest_func(func),
+            obj(obj),
+            arg1(arg1),
+            arg2(arg2),
+            arg3(arg3),
+            arg4(arg4),
+            arg5(arg5),
+            arg6(arg6),
+            arg7(arg7) {}
+      virtual void run() {
+        (obj->*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7);
+      }
+      virtual ITask_impl* fork() {
+        return new lambda(dest_func, obj, arg1, arg2, arg3, arg4, arg5, arg6,
+                          arg7);
+      }
+    };
+    return Task(
+        new lambda(func, obj, arg1, arg2, arg3, arg4, arg5, arg6, arg7));
+  }
+
+  template <typename T, typename RET, typename FARG1, typename FARG2,
+            typename FARG3, typename FARG4, typename FARG5, typename FARG6,
+            typename FARG7, typename FARG8, typename ARG1, typename ARG2,
+            typename ARG3, typename ARG4, typename ARG5, typename ARG6,
+            typename ARG7, typename ARG8>
+  static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6,
+                                 FARG7, FARG8),
+                  T* obj, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, ARG5 arg5,
+                  ARG6 arg6, ARG7 arg7, ARG8 arg8) {
+    struct lambda : public ITask_impl {
+      RET (T::*dest_func)
+      (FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7, FARG8);
+      T* obj;
+      ARG1 arg1;
+      ARG2 arg2;
+      ARG3 arg3;
+      ARG4 arg4;
+      ARG5 arg5;
+      ARG6 arg6;
+      ARG7 arg7;
+      ARG8 arg8;
+      lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7,
+                            FARG8),
+             T* obj, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+             const ARG4& arg4, const ARG5& arg5, const ARG6& arg6,
+             const ARG7& arg7, const ARG8& arg8)
+          : dest_func(func),
+            obj(obj),
+            arg1(arg1),
+            arg2(arg2),
+            arg3(arg3),
+            arg4(arg4),
+            arg5(arg5),
+            arg6(arg6),
+            arg7(arg7),
+            arg8(arg8) {}
+      virtual void run() {
+        (obj->*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8);
+      }
+      virtual ITask_impl* fork() {
+        return new lambda(dest_func, obj, arg1, arg2, arg3, arg4, arg5, arg6,
+                          arg7, arg8);
+      }
+    };
+    return Task(
+        new lambda(func, obj, arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8));
+  }
+
+  template <typename T, typename RET, typename FARG1, typename FARG2,
+            typename FARG3, typename FARG4, typename FARG5, typename FARG6,
+            typename FARG7, typename FARG8, typename FARG9, typename ARG1,
+            typename ARG2, typename ARG3, typename ARG4, typename ARG5,
+            typename ARG6, typename ARG7, typename ARG8, typename ARG9>
+  static Task gen(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6,
+                                 FARG7, FARG8, FARG9),
+                  T* obj, ARG1 arg1, ARG2 arg2, ARG3 arg3, ARG4 arg4, ARG5 arg5,
+                  ARG6 arg6, ARG7 arg7, ARG8 arg8, ARG9 arg9) {
+    struct lambda : public ITask_impl {
+      RET (T::*dest_func)
+      (FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7, FARG8, FARG9);
+      T* obj;
+      ARG1 arg1;
+      ARG2 arg2;
+      ARG3 arg3;
+      ARG4 arg4;
+      ARG5 arg5;
+      ARG6 arg6;
+      ARG7 arg7;
+      ARG8 arg8;
+      ARG9 arg9;
+      lambda(RET (T::*func)(FARG1, FARG2, FARG3, FARG4, FARG5, FARG6, FARG7,
+                            FARG8, FARG9),
+             T* obj, const ARG1& arg1, const ARG2& arg2, const ARG3& arg3,
+             const ARG4& arg4, const ARG5& arg5, const ARG6& arg6,
+             const ARG7& arg7, const ARG8& arg8, const ARG9& arg9)
+          : dest_func(func),
+            obj(obj),
+            arg1(arg1),
+            arg2(arg2),
+            arg3(arg3),
+            arg4(arg4),
+            arg5(arg5),
+            arg6(arg6),
+            arg7(arg7),
+            arg8(arg8),
+            arg9(arg9) {}
+      virtual void run() {
+        (obj->*dest_func)(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9);
+      }
+      virtual ITask_impl* fork() {
+        return new lambda(dest_func, obj, arg1, arg2, arg3, arg4, arg5, arg6,
+                          arg7, arg8, arg9);
+      }
+    };
+    return Task(new lambda(func, obj, arg1, arg2, arg3, arg4, arg5, arg6, arg7,
+                           arg8, arg9));
+  }
+};
+
+//<!***************************************************************************
+class disruptorLFQ;
+class TaskQueue : public ITaskQueue {
+ public:
+  TaskQueue(int threadCount);
+  virtual ~TaskQueue();
+  virtual void close();
+  virtual void produce(const Task& task);
+  virtual int run();
+  virtual bool bTaskQueueStatusOK();
+
+ private:
+  boost::atomic<bool> m_flag;
+  disruptorLFQ* m_disruptorLFQ;
+  boost::mutex m_publishLock;
+};
+
+//<!***************************************************************************
+}  //<! end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/ClientRemotingProcessor.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/ClientRemotingProcessor.cpp b/rocketmq-cpp/src/transport/ClientRemotingProcessor.cpp
new file mode 100644
index 0000000..a1462e6
--- /dev/null
+++ b/rocketmq-cpp/src/transport/ClientRemotingProcessor.cpp
@@ -0,0 +1,146 @@
+#include "ClientRemotingProcessor.h"
+#include "ClientRPCHook.h"
+#include "ConsumerRunningInfo.h"
+#include "MQClientFactory.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+ClientRemotingProcessor::ClientRemotingProcessor(
+    MQClientFactory* mqClientFactory)
+    : m_mqClientFactory(mqClientFactory) {}
+
+ClientRemotingProcessor::~ClientRemotingProcessor() {}
+
+RemotingCommand* ClientRemotingProcessor::processRequest(
+    const string& addr, RemotingCommand* request) {
+  LOG_DEBUG("request Command received:processRequest");
+  switch (request->getCode()) {
+    case CHECK_TRANSACTION_STATE:
+      //  return checkTransactionState( request);
+      break;
+    case NOTIFY_CONSUMER_IDS_CHANGED:
+      return notifyConsumerIdsChanged(request);
+      break;
+    case RESET_CONSUMER_CLIENT_OFFSET:  // oneWayRPC
+      return resetOffset(request);
+    case GET_CONSUMER_STATUS_FROM_CLIENT:
+      // return getConsumeStatus( request);
+      break;
+    case GET_CONSUMER_RUNNING_INFO:
+      return getConsumerRunningInfo(addr, request);
+      break;
+    case CONSUME_MESSAGE_DIRECTLY:
+      // return consumeMessageDirectly( request);
+      break;
+    default:
+      break;
+  }
+  return NULL;
+}
+
+RemotingCommand* ClientRemotingProcessor::resetOffset(
+    RemotingCommand* request) {
+  request->SetExtHeader(request->getCode());
+  const MemoryBlock* pbody = request->GetBody();
+  if (pbody->getSize()) {
+    ResetOffsetBody* offsetBody = ResetOffsetBody::Decode(pbody);
+    ResetOffsetRequestHeader* offsetHeader =
+        (ResetOffsetRequestHeader*)request->getCommandHeader();
+    if (offsetBody) {
+      m_mqClientFactory->resetOffset(offsetHeader->getGroup(),
+                                     offsetHeader->getTopic(),
+                                     offsetBody->getOffsetTable());
+    } else {
+      LOG_ERROR(
+          "resetOffset failed as received data could not be unserialized");
+    }
+  }
+  return NULL;  // as resetOffset is oneWayRPC, do not need return any response
+}
+
+std::map<MQMessageQueue, int64> ResetOffsetBody::getOffsetTable() {
+  return m_offsetTable;
+}
+
+void ResetOffsetBody::setOffsetTable(MQMessageQueue mq, int64 offset) {
+  m_offsetTable[mq] = offset;
+}
+
+ResetOffsetBody* ResetOffsetBody::Decode(const MemoryBlock* mem) {
+  const char* const pData = static_cast<const char*>(mem->getData());
+  Json::Reader reader;
+  Json::Value root;
+  const char* begin = pData;
+  const char* end = pData + mem->getSize();
+
+  if (!reader.parse(begin, end, root, true)) {
+    LOG_ERROR("ResetOffsetBody::Decode fail");
+    return NULL;
+  }
+
+  ResetOffsetBody* rfb = new ResetOffsetBody();
+  Json::Value qds = root["offsetTable"];
+  for (unsigned int i = 0; i < qds.size(); i++) {
+    MQMessageQueue mq;
+    Json::Value qd = qds[i];
+    mq.setBrokerName(qd["brokerName"].asString());
+    mq.setQueueId(qd["queueId"].asInt());
+    mq.setTopic(qd["topic"].asString());
+    int64 offset = qd["offset"].asInt();
+    LOG_INFO("ResetOffsetBody brokerName:%s, queueID:%d, topic:%s, offset:%lld",
+             mq.getBrokerName().c_str(), mq.getQueueId(), mq.getTopic().c_str(),
+             offset);
+    rfb->setOffsetTable(mq, offset);
+  }
+  return rfb;
+}
+
+RemotingCommand* ClientRemotingProcessor::getConsumerRunningInfo(
+    const string& addr, RemotingCommand* request) {
+  request->SetExtHeader(request->getCode());
+  GetConsumerRunningInfoRequestHeader* requestHeader =
+      (GetConsumerRunningInfoRequestHeader*)request->getCommandHeader();
+  LOG_INFO("getConsumerRunningInfo:%s",
+           requestHeader->getConsumerGroup().c_str());
+
+  RemotingCommand* pResponse = new RemotingCommand(
+      request->getCode(), "CPP", request->getVersion(), request->getOpaque(),
+      request->getFlag(), request->getRemark(), NULL);
+
+  unique_ptr<ConsumerRunningInfo> runningInfo(
+      m_mqClientFactory->consumerRunningInfo(
+          requestHeader->getConsumerGroup()));
+  if (runningInfo) {
+    if (requestHeader->isJstackEnable()) {
+      /*string jstack = UtilAll::jstack();
+       consumerRunningInfo->setJstack(jstack);*/
+    }
+    pResponse->setCode(SUCCESS_VALUE);
+    string body = runningInfo->encode();
+    pResponse->SetBody(body.c_str(), body.length());
+    pResponse->setMsgBody(body);
+  } else {
+    pResponse->setCode(SYSTEM_ERROR);
+    pResponse->setRemark("The Consumer Group not exist in this consumer");
+  }
+
+  SessionCredentials sessionCredentials;
+  m_mqClientFactory->getSessionCredentialFromConsumer(
+      requestHeader->getConsumerGroup(), sessionCredentials);
+  ClientRPCHook rpcHook(sessionCredentials);
+  rpcHook.doBeforeRequest(addr, *pResponse);
+  pResponse->Encode();
+  return pResponse;
+}
+
+RemotingCommand* ClientRemotingProcessor::notifyConsumerIdsChanged(
+    RemotingCommand* request) {
+  request->SetExtHeader(request->getCode());
+  NotifyConsumerIdsChangedRequestHeader* requestHeader =
+      (NotifyConsumerIdsChangedRequestHeader*)request->getCommandHeader();
+  LOG_INFO("notifyConsumerIdsChanged:%s", requestHeader->getGroup().c_str());
+  m_mqClientFactory->doRebalanceByConsumerGroup(requestHeader->getGroup());
+  return NULL;
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/ClientRemotingProcessor.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/ClientRemotingProcessor.h b/rocketmq-cpp/src/transport/ClientRemotingProcessor.h
new file mode 100644
index 0000000..1bb757e
--- /dev/null
+++ b/rocketmq-cpp/src/transport/ClientRemotingProcessor.h
@@ -0,0 +1,39 @@
+#ifndef __CLIENTREMOTINGPROCESSOR_H__
+#define __CLIENTREMOTINGPROCESSOR_H__
+
+#include "MQMessageQueue.h"
+#include "MQProtos.h"
+#include "RemotingCommand.h"
+
+namespace rocketmq {
+
+class MQClientFactory;
+class ClientRemotingProcessor {
+ public:
+  ClientRemotingProcessor(MQClientFactory* mqClientFactory);
+  virtual ~ClientRemotingProcessor();
+
+  RemotingCommand* processRequest(const string& addr, RemotingCommand* request);
+  RemotingCommand* resetOffset(RemotingCommand* request);
+  RemotingCommand* getConsumerRunningInfo(const string& addr,
+                                          RemotingCommand* request);
+  RemotingCommand* notifyConsumerIdsChanged(RemotingCommand* request);
+
+ private:
+  MQClientFactory* m_mqClientFactory;
+};
+
+class ResetOffsetBody {
+ public:
+  ResetOffsetBody() {}
+  virtual ~ResetOffsetBody() { m_offsetTable.clear(); }
+  void setOffsetTable(MQMessageQueue mq, int64 offset);
+  std::map<MQMessageQueue, int64> getOffsetTable();
+  static ResetOffsetBody* Decode(const MemoryBlock* mem);
+
+ private:
+  std::map<MQMessageQueue, int64> m_offsetTable;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/ResponseFuture.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/ResponseFuture.cpp b/rocketmq-cpp/src/transport/ResponseFuture.cpp
new file mode 100755
index 0000000..05cef84
--- /dev/null
+++ b/rocketmq-cpp/src/transport/ResponseFuture.cpp
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ResponseFuture.h"
+#include "Logging.h"
+#include "TcpRemotingClient.h"
+
+namespace rocketmq {
+//<!************************************************************************
+ResponseFuture::ResponseFuture(int requestCode, int opaque,
+                               TcpRemotingClient* powner, int64 timeout,
+                               bool bAsync /* = false */,
+                               AsyncCallbackWrap* pcall /* = NULL */) {
+  m_bAsync.store(bAsync);
+  m_requestCode = requestCode;
+  m_opaque = opaque;
+  m_timeout = timeout;
+  m_pCallbackWrap = pcall;
+  m_pResponseCommand = NULL;
+  m_sendRequestOK = false;
+  m_beginTimestamp = UtilAll::currentTimeMillis();
+  m_asyncCallbackStatus = asyncCallBackStatus_init;
+  if (getASyncFlag()) {
+    m_asyncResponse.store(false);
+    m_syncResponse.store(true);
+  } else {
+    m_asyncResponse.store(true);
+    m_syncResponse.store(false);
+  }
+}
+
+ResponseFuture::~ResponseFuture() {
+  deleteAndZero(m_pCallbackWrap);
+  /*
+    do not set m_pResponseCommand to NULL when destruct, as m_pResponseCommand
+    is used by MQClientAPIImpl concurrently, and will be released by producer or
+    consumer;
+    m_pResponseCommand = NULL;
+  */
+}
+
+void ResponseFuture::releaseThreadCondition() { m_defaultEvent.notify_all(); }
+
+RemotingCommand* ResponseFuture::waitResponse(int timeoutMillis) {
+  boost::unique_lock<boost::mutex> lk(m_defaultEventLock);
+  if (!m_defaultEvent.timed_wait(
+          lk, boost::posix_time::milliseconds(timeoutMillis))) {
+    LOG_WARN("waitResponse of code:%d with opaque:%d timeout", m_requestCode,
+             m_opaque);
+    m_syncResponse.store(true);
+  }
+  return m_pResponseCommand;
+}
+
+void ResponseFuture::setResponse(RemotingCommand* pResponseCommand) {
+  // LOG_DEBUG("setResponse of opaque:%d",m_opaque);
+  m_pResponseCommand = pResponseCommand;
+
+  if (!getASyncFlag()) {
+    if (m_syncResponse.load() == false) {
+      m_defaultEvent.notify_all();
+      m_syncResponse.store(true);
+    }
+  }
+}
+
+const bool ResponseFuture::getSyncResponseFlag() {
+  if (m_syncResponse.load() == true) {
+    return true;
+  }
+  return false;
+}
+
+const bool ResponseFuture::getAsyncResponseFlag() {
+  if (m_asyncResponse.load() == true) {
+    // LOG_DEBUG("ASYNC flag is TRUE,opaque is:%d",getOpaque() );
+    return true;
+  }
+
+  return false;
+}
+
+void ResponseFuture::setAsyncResponseFlag() { m_asyncResponse.store(true); }
+
+const bool ResponseFuture::getASyncFlag() {
+  if (m_bAsync.load() == true) {
+    // LOG_DEBUG("ASYNC flag is TRUE,opaque is:%d",getOpaque() );
+    return true;
+  }
+  return false;
+}
+
+bool ResponseFuture::isSendRequestOK() { return m_sendRequestOK; }
+
+void ResponseFuture::setSendRequestOK(bool sendRequestOK) {
+  m_sendRequestOK = sendRequestOK;
+}
+
+int ResponseFuture::getOpaque() const { return m_opaque; }
+
+int ResponseFuture::getRequestCode() const { return m_requestCode; }
+
+void ResponseFuture::setAsyncCallBackStatus(
+    asyncCallBackStatus asyncCallbackStatus) {
+  boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
+  if (m_asyncCallbackStatus == asyncCallBackStatus_init) {
+    m_asyncCallbackStatus = asyncCallbackStatus;
+  }
+}
+
+void ResponseFuture::executeInvokeCallback() {
+  if (m_pCallbackWrap == NULL) {
+    deleteAndZero(m_pResponseCommand);
+    return;
+  } else {
+    if (m_asyncCallbackStatus == asyncCallBackStatus_response) {
+      m_pCallbackWrap->operationComplete(this, true);
+    } else {
+      if (m_pResponseCommand)
+        deleteAndZero(m_pResponseCommand);  // the responseCommand from
+                                            // RemotingCommand::Decode(mem) will
+                                            // only deleted by operationComplete
+                                            // automatically
+      LOG_WARN(
+          "timeout and response incoming concurrently of opaque:%d, and "
+          "executeInvokeCallbackException was called earlier",
+          m_opaque);
+    }
+  }
+}
+
+void ResponseFuture::executeInvokeCallbackException() {
+  if (m_pCallbackWrap == NULL) {
+    LOG_ERROR("m_pCallbackWrap is NULL, critical error");
+    return;
+  } else {
+    if (m_asyncCallbackStatus == asyncCallBackStatus_timeout) {
+      m_pCallbackWrap->onException();
+    } else {
+      LOG_WARN(
+          "timeout and response incoming concurrently of opaque:%d, and "
+          "executeInvokeCallback was called earlier",
+          m_opaque);
+    }
+  }
+}
+
+bool ResponseFuture::isTimeOut() const {
+  int64 diff = UtilAll::currentTimeMillis() - m_beginTimestamp;
+  //<!only async;
+  return m_bAsync.load() == 1 && diff > m_timeout;
+}
+
+RemotingCommand* ResponseFuture::getCommand() const {
+  return m_pResponseCommand;
+}
+
+AsyncCallbackWrap* ResponseFuture::getAsyncCallbackWrap() {
+  return m_pCallbackWrap;
+}
+
+//<!************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/ResponseFuture.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/ResponseFuture.h b/rocketmq-cpp/src/transport/ResponseFuture.h
new file mode 100755
index 0000000..92fa772
--- /dev/null
+++ b/rocketmq-cpp/src/transport/ResponseFuture.h
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RESPONSEFUTURE_H__
+#define __RESPONSEFUTURE_H__
+#include <boost/atomic.hpp>
+#include <boost/thread/condition_variable.hpp>
+#include "AsyncCallbackWrap.h"
+#include "RemotingCommand.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+typedef enum asyncCallBackStatus {
+  asyncCallBackStatus_init = 0,
+  asyncCallBackStatus_response = 1,
+  asyncCallBackStatus_timeout = 2
+} asyncCallBackStatus;
+
+class TcpRemotingClient;
+//<!***************************************************************************
+class ResponseFuture {
+ public:
+  ResponseFuture(int requestCode, int opaque, TcpRemotingClient* powner,
+                 int64 timeoutMilliseconds, bool bAsync = false,
+                 AsyncCallbackWrap* pcall = NULL);
+  virtual ~ResponseFuture();
+  void releaseThreadCondition();
+  RemotingCommand* waitResponse(int timeoutMillis);
+  RemotingCommand* getCommand() const;
+
+  void setResponse(RemotingCommand* pResponseCommand);
+  bool isSendRequestOK();
+  void setSendRequestOK(bool sendRequestOK);
+  int getRequestCode() const;
+  int getOpaque() const;
+
+  //<!callback;
+  void executeInvokeCallback();
+  void executeInvokeCallbackException();
+  bool isTimeOut() const;
+  // bool    isTimeOutMoreThan30s() const;
+  const bool getASyncFlag();
+  void setAsyncResponseFlag();
+  const bool getAsyncResponseFlag();
+  const bool getSyncResponseFlag();
+  AsyncCallbackWrap* getAsyncCallbackWrap();
+  void setAsyncCallBackStatus(asyncCallBackStatus asyncCallbackStatus);
+
+ private:
+  int m_requestCode;
+  int m_opaque;
+  bool m_sendRequestOK;
+  boost::mutex m_defaultEventLock;
+  boost::condition_variable_any m_defaultEvent;
+  int64 m_beginTimestamp;
+  int64 m_timeout;  // ms
+  boost::atomic<bool> m_bAsync;
+  RemotingCommand* m_pResponseCommand;  //<!delete outside;
+  AsyncCallbackWrap* m_pCallbackWrap;
+  boost::mutex m_asyncCallbackLock;
+  asyncCallBackStatus m_asyncCallbackStatus;
+  boost::atomic<bool> m_asyncResponse;
+  boost::atomic<bool> m_syncResponse;
+  // TcpRemotingClient*    m_tcpRemoteClient;
+};
+//<!************************************************************************
+}  //<!end namespace;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/SocketUtil.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/SocketUtil.cpp b/rocketmq-cpp/src/transport/SocketUtil.cpp
new file mode 100755
index 0000000..d428f24
--- /dev/null
+++ b/rocketmq-cpp/src/transport/SocketUtil.cpp
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "SocketUtil.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+sockaddr IPPort2socketAddress(int host, int port) {
+  struct sockaddr_in sa;
+  sa.sin_family = AF_INET;
+  sa.sin_port = htons((uint16)port);
+  sa.sin_addr.s_addr = htonl(host);
+
+  sockaddr bornAddr;
+  memcpy(&bornAddr, &sa, sizeof(sockaddr));
+  return bornAddr;
+}
+
+string socketAddress2IPPort(sockaddr addr) {
+  sockaddr_in sa;
+  memcpy(&sa, &addr, sizeof(sockaddr));
+
+  char tmp[32];
+  sprintf(tmp, "%s:%d", inet_ntoa(sa.sin_addr), ntohs(sa.sin_port));
+
+  string ipport = tmp;
+  return ipport;
+}
+
+void socketAddress2IPPort(sockaddr addr, int& host, int& port) {
+  struct sockaddr_in sa;
+  memcpy(&sa, &addr, sizeof(sockaddr));
+
+  host = ntohl(sa.sin_addr.s_addr);
+  port = ntohs(sa.sin_port);
+}
+
+string socketAddress2String(sockaddr addr) {
+  sockaddr_in in;
+  memcpy(&in, &addr, sizeof(sockaddr));
+
+  return inet_ntoa(in.sin_addr);
+}
+
+string getHostName(sockaddr addr) {
+  sockaddr_in in;
+  memcpy(&in, &addr, sizeof(sockaddr));
+
+  struct hostent* remoteHost = gethostbyaddr((char*)&(in.sin_addr), 4, AF_INET);
+  char** alias = remoteHost->h_aliases;
+  if (*alias != 0) {
+    return *alias;
+  } else {
+    return inet_ntoa(in.sin_addr);
+  }
+}
+
+uint64 swapll(uint64 v) {
+#ifdef ENDIANMODE_BIG
+  return v;
+#else
+  uint64 ret = ((v << 56) | ((v & 0xff00) << 40) | ((v & 0xff0000) << 24) |
+                ((v & 0xff000000) << 8) | ((v >> 8) & 0xff000000) |
+                ((v >> 24) & 0xff0000) | ((v >> 40) & 0xff00) | (v >> 56));
+
+  return ret;
+#endif
+}
+
+uint64 h2nll(uint64 v) { return swapll(v); }
+
+uint64 n2hll(uint64 v) { return swapll(v); }
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/transport/SocketUtil.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/transport/SocketUtil.h b/rocketmq-cpp/src/transport/SocketUtil.h
new file mode 100755
index 0000000..7cbba0c
--- /dev/null
+++ b/rocketmq-cpp/src/transport/SocketUtil.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __SOCKETUTIL_H__
+#define __SOCKETUTIL_H__
+
+#ifdef WIN32
+#include <WS2tcpip.h>
+#include <Windows.h>
+#include <Winsock2.h>
+#pragma comment(lib, "ws2_32.lib")
+#else
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <net/if.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <signal.h>
+#include <sys/ioctl.h>
+#include <sys/select.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+#endif
+#include <sys/socket.h>
+#include "UtilAll.h"
+
+namespace rocketmq {
+//<!************************************************************************
+/**
+* IP:PORT
+*/
+sockaddr IPPort2socketAddress(int host, int port);
+string socketAddress2IPPort(sockaddr addr);
+void socketAddress2IPPort(sockaddr addr, int& host, int& port);
+
+string socketAddress2String(sockaddr addr);
+string getHostName(sockaddr addr);
+
+uint64 h2nll(uint64 v);
+uint64 n2hll(uint64 v);
+
+//<!************************************************************************
+}  //<!end namespace;
+
+#endif