You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2017/09/04 06:44:39 UTC
[05/17] incubator-rocketmq-externals git commit: Polish cpp module
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/Mutex.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/Mutex.cpp b/rocketmq-client4cpp/src/kpr/Mutex.cpp
deleted file mode 100755
index f98282c..0000000
--- a/rocketmq-client4cpp/src/kpr/Mutex.cpp
+++ /dev/null
@@ -1,296 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "Mutex.h"
-
-#include <pthread.h>
-#include <unistd.h>
-#include <stdio.h>
-#include <errno.h>
-#include <time.h>
-
-
-namespace kpr
-{
-Mutex::Mutex()
-{
- ::pthread_mutex_init(&m_mutex, NULL);
-}
-
-Mutex::~Mutex()
-{
- ::pthread_mutex_destroy(&m_mutex);
-}
-
-void Mutex::Lock() const
-{
- ::pthread_mutex_lock(&m_mutex);
-}
-
-bool Mutex::TryLock() const
-{
- int ret = ::pthread_mutex_trylock(&m_mutex);
- return (ret == 0);
-}
-
-bool Mutex::TryLock(int timeout) const
-{
- struct timespec ts;
- clock_gettime(CLOCK_REALTIME, &ts);
- ts.tv_sec += (timeout/1000);
- ts.tv_nsec += (timeout%1000) * 1000 * 1000;
-
- int ret = ::pthread_mutex_timedlock(&m_mutex, &ts);
- return (ret == 0);
-}
-
-
-void Mutex::Unlock() const
-{
- ::pthread_mutex_unlock(&m_mutex);
-}
-
-//***********
-//RWMutex
-//***************
-RWMutex::RWMutex()
-{
- ::pthread_rwlock_init(&m_mutex, NULL);
-}
-
-RWMutex::~RWMutex()
-{
- ::pthread_rwlock_destroy(&m_mutex);
-}
-
-void RWMutex::ReadLock() const
-{
- ::pthread_rwlock_rdlock(&m_mutex);
-}
-
-void RWMutex::WriteLock() const
-{
- ::pthread_rwlock_wrlock(&m_mutex);
-}
-
-bool RWMutex::TryReadLock() const
-{
- int ret = ::pthread_rwlock_tryrdlock(&m_mutex);
- return (ret == 0);
-}
-
-bool RWMutex::TryReadLock(int timeout) const
-{
- struct timespec ts;
- clock_gettime(CLOCK_REALTIME, &ts);
- ts.tv_sec += (timeout/1000);
- ts.tv_nsec += (timeout%1000) * 1000 * 1000;
-
- int ret = ::pthread_rwlock_timedrdlock(&m_mutex, &ts);
- return (ret == 0);
-}
-
-bool RWMutex::TryWriteLock() const
-{
- int ret = ::pthread_rwlock_trywrlock(&m_mutex);
- return (ret == 0);
-}
-
-bool RWMutex::TryWriteLock(int timeout) const
-{
- struct timespec ts;
- clock_gettime(CLOCK_REALTIME, &ts);
- ts.tv_sec += (timeout/1000);
- ts.tv_nsec += (timeout%1000) * 1000 * 1000;
-
- int ret = ::pthread_rwlock_timedwrlock(&m_mutex, &ts);
- return (ret == 0);
-}
-
-
-void RWMutex::Unlock() const
-{
- ::pthread_rwlock_unlock(&m_mutex);
-}
-
-
-//***********
-//RecursiveMutex
-//***************
-RecursiveMutex::RecursiveMutex()
- : m_count(0),
- m_owner(ThreadId())
-{
- ::pthread_mutex_init(&m_mutex, NULL);
-}
-
-RecursiveMutex::~RecursiveMutex()
-{
- ::pthread_mutex_destroy(&m_mutex);
-}
-
-bool RecursiveMutex::Lock()const
-{
- return ((RecursiveMutex*)this)->lock(1);
-}
-
-bool RecursiveMutex::Unlock()const
-{
- return ((RecursiveMutex*)this)->unlock();
-}
-
-bool RecursiveMutex::TryLock()const
-{
- return ((RecursiveMutex*)this)->tryLock();
-}
-
-ThreadId RecursiveMutex::GetOwner()const
-{
- m_internal.Lock();
- ThreadId id;
- if (m_count > 0)
- {
- id = m_owner;
- }
- m_internal.Unlock();
-
- return id;
-}
-
-bool RecursiveMutex::lock(int count)
-{
- bool rc = false;
- bool obtained = false;
-
- while (!obtained)
- {
- m_internal.Lock();
-
- if (m_count == 0)
- {
- m_count = count;
- m_owner = ThreadId::GetCurrentThreadId();
- obtained = true;
- rc = true;
-
- try
- {
- ::pthread_mutex_lock(&m_mutex);
- }
- catch (...)
- {
- try
- {
- m_internal.Unlock();
- }
- catch (...)
- {
- }
- throw;
- }
- }
- else if (m_owner == ThreadId::GetCurrentThreadId())
- {
- m_count += count;
- obtained = true;
- }
-
- m_internal.Unlock();
-
- if (!obtained)
- {
- ::pthread_mutex_lock(&m_mutex);
- ::pthread_mutex_unlock(&m_mutex);
- }
- }
-
- return rc;
-}
-
-bool RecursiveMutex::tryLock()
-{
- bool obtained = false;
-
- m_internal.Lock();
-
- if (m_count == 0)
- {
- m_count = 1;
- m_owner = ThreadId::GetCurrentThreadId();
- obtained = true;
-
- try
- {
- ::pthread_mutex_lock(&m_mutex);
- }
- catch (...)
- {
- try
- {
- m_internal.Unlock();
- }
- catch (...)
- {
- }
- throw;
- }
- }
- else if (m_owner == ThreadId::GetCurrentThreadId())
- {
- ++m_count;
- obtained = true;
- }
-
- m_internal.Unlock();
-
- return obtained;
-}
-
-bool RecursiveMutex::unlock()
-{
- bool rc;
- m_internal.Lock();
-
- if (--m_count == 0)
- {
- m_owner = ThreadId();
-
- ::pthread_mutex_unlock(&m_mutex);
-
- rc = true;
- }
- else
- {
- rc = false;
- }
-
- m_internal.Unlock();
-
- return rc;
-}
-
-unsigned int RecursiveMutex::reset4Condvar()
-{
- m_internal.Lock();
-
- unsigned int count = m_count;
- m_count = 0;
- m_owner = ThreadId();
-
- m_internal.Unlock();
-
- return count;
-}
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/Mutex.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/Mutex.h b/rocketmq-client4cpp/src/kpr/Mutex.h
deleted file mode 100755
index fc3498f..0000000
--- a/rocketmq-client4cpp/src/kpr/Mutex.h
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __KPR_MUTEX_H__
-#define __KPR_MUTEX_H__
-
-#include "KPRTypes.h"
-#include <errno.h>
-
-namespace kpr
-{
-class Mutex
-{
-public:
- Mutex();
- ~Mutex();
-
- void Lock()const;
- void Unlock()const;
- bool TryLock()const;
- bool TryLock(int timeout) const;
-
- ThreadId GetOwner()const;
-
-private:
- Mutex(const Mutex&);
- const Mutex& operator=(const Mutex&);
-
- mutable pthread_mutex_t m_mutex;
- friend class Condition;
-};
-
-class RWMutex
-{
-public:
- RWMutex();
- ~RWMutex();
-
- void ReadLock()const;
- void WriteLock()const;
- bool TryReadLock()const;
- bool TryReadLock(int timeout) const;
- bool TryWriteLock()const;
- bool TryWriteLock(int timeout)const;
- void Unlock()const;
-
- ThreadId GetOwner()const;
-
-private:
- RWMutex(const RWMutex&);
- const RWMutex& operator=(const RWMutex&);
-
- mutable pthread_rwlock_t m_mutex;
- friend class Condition;
-};
-
-class RecursiveMutex
-{
-public:
- RecursiveMutex();
- ~RecursiveMutex();
-
- bool Lock()const;
- bool Unlock()const;
- bool TryLock()const;
-
- ThreadId GetOwner()const;
-
- unsigned int GetCount()const
- {
- return m_count;
- }
-
-private:
- RecursiveMutex(const RecursiveMutex&);
-
- const RecursiveMutex& operator=(const RecursiveMutex&);
-
- bool lock(int count);
- bool tryLock();
- bool unlock();
-
- unsigned int reset4Condvar();
-
-private:
- pthread_mutex_t m_mutex;
- Mutex m_internal;
- mutable unsigned int m_count;
- mutable ThreadId m_owner;
-
- friend class Condition;
- friend class ConditionHelper;
-};
-}
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/RefHandle.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/RefHandle.h b/rocketmq-client4cpp/src/kpr/RefHandle.h
deleted file mode 100644
index fd7d741..0000000
--- a/rocketmq-client4cpp/src/kpr/RefHandle.h
+++ /dev/null
@@ -1,328 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __KPR_REFHANDLET_H__
-#define __KPR_REFHANDLET_H__
-
-#include "KPRTypes.h"
-#include "AtomicValue.h"
-#include "Exception.h"
-
-namespace kpr
-{
-
-class RefCount
-{
-public:
- RefCount& operator=(const RefCount&)
- {
- return *this;
- }
-
- void incRef()
- {
- m_refCount++;
- }
-
- void decRef()
- {
- if (--m_refCount == 0 && !m_noDelete)
- {
- m_noDelete = true;
- delete this;
- }
- }
-
- int getRef() const
- {
- return m_refCount.get();
- }
-
- void setNoDelete(bool b)
- {
- m_noDelete = b;
- }
-
-protected:
- RefCount()
- : m_refCount(0), m_noDelete(false)
- {
- }
-
- RefCount(const RefCount&)
- : m_refCount(0), m_noDelete(false)
- {
- }
-
- virtual ~RefCount()
- {
- }
-
-protected:
- AtomicInteger m_refCount;
- bool m_noDelete;
-};
-
-
-
-template <class T>
-class RefHandleT
-{
-public:
- RefHandleT(T* p = 0)
- {
- m_ptr = p;
-
- if (m_ptr)
- {
- m_ptr->incRef();
- }
- }
-
- template<typename Y>
- RefHandleT(const RefHandleT<Y>& v)
- {
- m_ptr = v.m_ptr;
-
- if (m_ptr)
- {
- m_ptr->incRef();
- }
- }
-
- RefHandleT(const RefHandleT& v)
- {
- m_ptr = v.m_ptr;
-
- if (m_ptr)
- {
- m_ptr->incRef();
- }
- }
-
- ~RefHandleT()
- {
- if (m_ptr)
- {
- m_ptr->decRef();
- }
- }
-
- RefHandleT<T>& operator=(T* p)
- {
- if (m_ptr != p)
- {
- if (p)
- {
- p->incRef();
- }
-
- T* ptr = m_ptr;
- m_ptr = p;
-
- if (ptr)
- {
- ptr->decRef();
- }
- }
-
- return *this;
- }
-
- template<typename Y>
- RefHandleT<T>& operator=(const RefHandleT<Y>& v)
- {
- if (m_ptr != v.m_ptr)
- {
- if (v.m_ptr)
- {
- v.m_ptr->incRef();
- }
-
- T* ptr = m_ptr;
- m_ptr = v.m_ptr;
-
- if (ptr)
- {
- ptr->decRef();
- }
- }
-
- return *this;
- }
-
- RefHandleT<T>& operator=(const RefHandleT<T>& v)
- {
- if (m_ptr != v.m_ptr)
- {
- if (v.m_ptr)
- {
- v.m_ptr->incRef();
- }
-
- T* ptr = m_ptr;
- m_ptr = v.m_ptr;
-
- if (ptr)
- {
- ptr->decRef();
- }
- }
-
- return *this;
- }
-
- T* operator->() const
- {
- if (!m_ptr)
- {
- THROW_EXCEPTION(RefHandleNullException, "autoptr null handle error", -1);
- }
-
- return m_ptr;
- }
-
- T& operator*() const
- {
- if (!m_ptr)
- {
- THROW_EXCEPTION(RefHandleNullException, "autoptr null handle error", -1);
- }
-
- return *m_ptr;
- }
-
- operator T* () const
- {
- return m_ptr;
- }
-
- T* ptr() const
- {
- return m_ptr;
- }
-
- T* retn()
- {
- T* p = m_ptr;
- m_ptr = 0;
-
- return p;
- }
-
- bool operator==(const RefHandleT<T>& v) const
- {
- return m_ptr == v.m_ptr;
- }
-
- bool operator==(T* p) const
- {
- return m_ptr == p;
- }
-
- bool operator!=(const RefHandleT<T>& v) const
- {
- return m_ptr != v.m_ptr;
- }
-
- bool operator!=(T* p) const
- {
- return m_ptr != p;
- }
-
- bool operator!() const
- {
- return m_ptr == 0;
- }
-
- operator bool() const
- {
- return m_ptr != 0;
- }
-
- void swap(RefHandleT& other)
- {
- std::swap(m_ptr, other._ptr);
- }
-
- template<class Y>
- static RefHandleT dynamicCast(const RefHandleT<Y>& r)
- {
- return RefHandleT(dynamic_cast<T*>(r._ptr));
- }
-
- template<class Y>
- static RefHandleT dynamicCast(Y* p)
- {
- return RefHandleT(dynamic_cast<T*>(p));
- }
-
-public:
- T* m_ptr;
-};
-
-
-template<typename T, typename U>
-inline bool operator==(const RefHandleT<T>& lhs, const RefHandleT<U>& rhs)
-{
- T* l = lhs.ptr();
- U* r = rhs.ptr();
- if(l && r)
- {
- return *l == *r;
- }
- else
- {
- return !l && !r;
- }
-}
-
-
-template<typename T, typename U>
-inline bool operator!=(const RefHandleT<T>& lhs, const RefHandleT<U>& rhs)
-{
- T* l = lhs.ptr();
- U* r = rhs.ptr();
- if(l && r)
- {
- return *l != *r;
- }
- else
- {
- return l || r;
- }
-}
-
-
-template<typename T, typename U>
-inline bool operator<(const RefHandleT<T>& lhs, const RefHandleT<U>& rhs)
-{
- T* l = lhs.ptr();
- U* r = rhs.ptr();
- if(l && r)
- {
- return *l < *r;
- }
- else
- {
- return !l && r;
- }
-}
-
-}
-
-#define DECLAREVAR(T) typedef kpr::RefHandleT<T> T ## Ptr;
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/ScopedLock.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/ScopedLock.h b/rocketmq-client4cpp/src/kpr/ScopedLock.h
deleted file mode 100755
index 6ff9dd1..0000000
--- a/rocketmq-client4cpp/src/kpr/ScopedLock.h
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __KPR_SCOPEDLOCK_H__
-#define __KPR_SCOPEDLOCK_H__
-
-namespace kpr
-{
-template <class T>
-class ScopedLock
-{
-public:
- ScopedLock(const T& mutex)
- : m_mutex(mutex)
- {
- m_mutex.Lock();
- }
-
- ~ScopedLock()
- {
- m_mutex.Unlock();
- }
-
-private:
- const T& m_mutex;
-};
-
-
-template <class T>
-class ScopedRLock
-{
-public:
- ScopedRLock(const T& mutex)
- : m_mutex(mutex)
- {
- m_mutex.ReadLock();
- m_acquired = true;
- }
-
- ~ScopedRLock()
- {
- if (m_acquired)
- {
- m_mutex.Unlock();
- }
- }
-
-private:
- const T& m_mutex;
- mutable bool m_acquired;
-};
-
-
-template <class T>
-class ScopedWLock
-{
-public:
- ScopedWLock(const T& mutex)
- : m_mutex(mutex)
- {
- m_mutex.WriteLock();
- m_acquired = true;
- }
-
- ~ScopedWLock()
- {
- if (m_acquired)
- {
- m_mutex.Unlock();
- }
- }
-
-private:
- const T& m_mutex;
- mutable bool m_acquired;
-};
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/Semaphore.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/Semaphore.cpp b/rocketmq-client4cpp/src/kpr/Semaphore.cpp
deleted file mode 100755
index 59a0eef..0000000
--- a/rocketmq-client4cpp/src/kpr/Semaphore.cpp
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "Semaphore.h"
-
-#include <unistd.h>
-#include <sys/time.h>
-#include "KPRUtil.h"
-
-namespace kpr
-{
-Semaphore::Semaphore(long initial_count)
-{
- sem_init(&m_sem, 0, initial_count);
-}
-
-Semaphore::~Semaphore()
-{
- sem_destroy(&m_sem);
-}
-
-int Semaphore::GetValue()
-{
- int value = 0;
- int rc = sem_getvalue(&m_sem, &value);
- if (rc < 0)
- {
- return rc;
- }
- return value;
-}
-
-bool Semaphore::Wait()
-{
- int rc;
- rc = sem_wait(&m_sem);
- return !rc;
-}
-
-bool Semaphore::Wait(long timeout)
-{
- int rc;
- if (timeout < 0)
- {
- rc = sem_wait(&m_sem);
- }
- else
- {
- struct timespec abstime = KPRUtil::CalcAbsTime(timeout);
- rc = sem_timedwait(&m_sem, &abstime);
- }
-
- return !rc;
-}
-
-void Semaphore::Release(int count)
-{
- sem_post(&m_sem);
-}
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/Semaphore.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/Semaphore.h b/rocketmq-client4cpp/src/kpr/Semaphore.h
deleted file mode 100755
index 2a1af7f..0000000
--- a/rocketmq-client4cpp/src/kpr/Semaphore.h
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __KPR_SEMAPHORE_H__
-#define __KPR_SEMAPHORE_H__
-
-#include "KPRTypes.h"
-#include <errno.h>
-
-namespace kpr
-{
-
-class Semaphore
-{
-public:
- Semaphore(long initial_count = 0);
- ~Semaphore();
-
- int GetValue();
- bool Wait();
- bool Wait(long timeout);
-
- void Release(int count = 1);
-
-private:
- sem_t m_sem;
-};
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/Thread.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/Thread.cpp b/rocketmq-client4cpp/src/kpr/Thread.cpp
deleted file mode 100755
index d80819b..0000000
--- a/rocketmq-client4cpp/src/kpr/Thread.cpp
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "Thread.h"
-
-#include <string.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include <errno.h>
-#include <assert.h>
-#include <unistd.h>
-#include <sys/types.h>
-#include <signal.h>
-
-#include "ScopedLock.h"
-#include "Exception.h"
-
-//for log
-#include "RocketMQClient.h"
-
-namespace kpr
-{
-kpr::AtomicInteger Thread::s_threadNumber = 0;
-
-void* Thread::ThreadRoute(void* pArg)
-{
- Thread* tv = ((Thread*)pArg);
-
- try
- {
- tv->Startup();
- }
- catch (...)
- {
- }
-
- try
- {
- tv->Cleanup();
- }
- catch (...)
- {
- }
-
- return 0;
-}
-
-Thread::Thread(const char* name)
-{
- m_started = false;
- m_threadId = ThreadId();
- m_threadNumber = s_threadNumber++;
-
- SetName(name);
-}
-
-Thread::~Thread()
-{
- try
- {
- }
- catch (...)
- {
- }
-}
-
-void Thread::SetName(const char* name)
-{
- ScopedLock<Mutex> guard(m_mutex);
-
- if (name == NULL)
- {
- snprintf(m_name, sizeof(m_name), "Thread-%u", m_threadNumber);
- }
- else
- {
- snprintf(m_name, sizeof(m_name), "%s", name);
- }
-}
-
-const char* Thread::GetName() const
-{
- ScopedLock<Mutex> guard(m_mutex);
- return m_name;
-}
-
-void Thread::Start()
-{
- ScopedLock<Mutex> guard(m_mutex);
-
- if (m_started)
- {
- return;
- }
-
- pthread_attr_t attr;
- int retcode = 0;
- retcode = pthread_attr_init(&attr);
- if (retcode != 0)
- {
- THROW_EXCEPTION(SystemCallException, "pthread_attr_init failed!", errno)
- }
-
- pthread_t id;
- retcode = pthread_create(&id, &attr, ThreadRoute, (void*)this);
- if (retcode != 0)
- {
- THROW_EXCEPTION(SystemCallException, "pthread_create error", errno)
- }
-
- m_threadId = id;
- pthread_attr_destroy(&attr);
- m_started = true;
- RMQ_DEBUG("thread[%s][%ld] start successfully", m_name, (long)id);
-}
-
-void Thread::Run()
-{
- //TODO support runable
-}
-
-bool Thread::IsAlive() const
-{
- if (m_started)
- {
- int retcode = pthread_kill(m_threadId, 0);
- return (retcode == ESRCH);
- }
-
- return false;
-}
-
-void Thread::Join()
-{
- if (m_started)
- {
- pthread_join(m_threadId, NULL);
- }
-}
-
-void Thread::Sleep(long millis, int nanos)
-{
- assert(millis >= 0 && nanos >= 0 && nanos < 999999);
- struct timespec tv;
- tv.tv_sec = millis / 1000;
- tv.tv_nsec = (millis % 1000) * 1000000 + nanos;
- nanosleep(&tv, 0);
-}
-
-void Thread::Yield()
-{
- pthread_yield();
-}
-
-ThreadId Thread::GetId() const
-{
- ScopedLock<Mutex> guard(m_mutex);
- return m_threadId;
-}
-
-void Thread::Startup()
-{
- try
- {
- RMQ_INFO("thread[%s] started", GetName());
- Run();
- }
- catch (...)
- {
- }
-}
-
-void Thread::Cleanup()
-{
- RMQ_INFO("thread[%s] end", GetName());
-}
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/Thread.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/Thread.h b/rocketmq-client4cpp/src/kpr/Thread.h
deleted file mode 100755
index ef2590e..0000000
--- a/rocketmq-client4cpp/src/kpr/Thread.h
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __KPR_THREAD_H__
-#define __KPR_THREAD_H__
-
-#include "KPRTypes.h"
-#include "RefHandle.h"
-#include "Mutex.h"
-
-#ifdef Yield
-#undef Yield
-#endif
-
-namespace kpr
-{
-class Thread : public virtual kpr::RefCount
-{
-public:
- Thread(const char* name = NULL);
- virtual ~Thread();
-
- virtual void Run();
- void Start();
- bool IsAlive() const;
- void Join();
- ThreadId GetId() const;
-
- void SetName(const char*);
- const char* GetName() const;
-
- void Startup();
- void Cleanup();
-
- static void Sleep(long millis, int nano = 0);
- static void Yield();
-
-private:
- Thread(const Thread&);
- const Thread& operator=(const Thread&);
- static void* ThreadRoute(void* pArg);
-
-private:
- ThreadId m_threadId;
- unsigned int m_threadNumber;
- char m_name[128];
- bool m_started;
- Mutex m_mutex;
-
- static kpr::AtomicInteger s_threadNumber;
-};
-typedef kpr::RefHandleT<Thread> ThreadPtr;
-
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp b/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp
deleted file mode 100755
index 32cba5b..0000000
--- a/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ThreadLocal.h"
-
-#include <errno.h>
-
-#include "Exception.h"
-
-namespace kpr
-{
-ThreadLocal::ThreadLocal()
- : m_Key(0)
-{
- int retcode = 0;
-
- retcode = pthread_key_create(&m_Key, 0);
- if (retcode != 0)
- {
- THROW_EXCEPTION(SystemCallException, "pthread_key_create error", errno);
- }
-}
-
-ThreadLocal::~ThreadLocal()
-{
- pthread_key_delete(m_Key);
-}
-
-void* ThreadLocal::GetValue()
-{
- void* v;
- v = pthread_getspecific(m_Key);
- return v;
-}
-
-void ThreadLocal::SetValue(void* value)
-{
- int retcode = pthread_setspecific(m_Key, value);
- if (retcode != 0)
- {
- THROW_EXCEPTION(SystemCallException, "pthread_setspecific error", errno);
- }
-}
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/ThreadLocal.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/ThreadLocal.h b/rocketmq-client4cpp/src/kpr/ThreadLocal.h
deleted file mode 100644
index 9ec8f43..0000000
--- a/rocketmq-client4cpp/src/kpr/ThreadLocal.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __KPR_THREADLOCAL_H__
-#define __KPR_THREADLOCAL_H__
-
-#include "KPRTypes.h"
-
-namespace kpr
-{
-class ThreadLocal
-{
-public:
- ThreadLocal();
- virtual ~ThreadLocal();
-
- void* GetValue();
- void SetValue(void* value);
-
-private:
- ThreadKey m_Key;
-};
-};
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/ThreadPool.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/ThreadPool.cpp b/rocketmq-client4cpp/src/kpr/ThreadPool.cpp
deleted file mode 100755
index 32557a8..0000000
--- a/rocketmq-client4cpp/src/kpr/ThreadPool.cpp
+++ /dev/null
@@ -1,418 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "ThreadPool.h"
-
-#include "RocketMQClient.h"
-#include "ScopedLock.h"
-#include "KPRUtil.h"
-
-namespace kpr
-{
-ThreadPoolWorker:: ThreadPoolWorker(ThreadPool* pThreadPool, const char* strName)
- : kpr::Thread(strName),
- m_pThreadPool(pThreadPool),
- m_canWork(false),
- m_isWaiting(false),
- m_stop(false),
- m_idleTime(0),
- m_idle(true)
-{
-
-}
-
-bool ThreadPoolWorker::IsIdle()
-{
- kpr::ScopedLock<kpr::Monitor> lock(*this);
- return m_idle;
-}
-
-void ThreadPoolWorker:: SetIdle(bool idle)
-{
- kpr::ScopedLock<kpr::Monitor> lock(*this);
- m_idle = idle;
- m_idleTime = 0;
-}
-
-int ThreadPoolWorker::IdleTime(int idleTime)
-{
- if (m_idle)
- {
- m_idleTime += idleTime;
- }
- else
- {
- m_idleTime = 0;
- }
-
- return m_idleTime;
-}
-
-void ThreadPoolWorker::Run()
-{
- while (!m_stop)
- {
- SetIdle(true);
- {
- kpr::ScopedLock<kpr::Monitor> lock(*this);
- while (!m_canWork)
- {
- try
- {
- m_isWaiting = true;
- Wait();
- m_isWaiting = false;
- }
- catch (...)
- {
- }
- }
-
- m_canWork = false;
- }
-
- while (!m_stop)
- {
- ThreadPoolWorkPtr request = m_pThreadPool->GetWork(this);
- if ((ThreadPoolWork*)(NULL) == request)
- {
- break;
- }
-
- SetIdle(false);
-
- try
- {
- request->Do();
- }
- catch(...)
- {
- RMQ_ERROR("thead[%s] doWork exception", GetName());
- }
-
- //delete request;
- request = NULL;
- }
-
- if (m_stop || m_pThreadPool->IsDestroy())
- {
- break;
- }
- }
-
- m_pThreadPool ->RemoveThread(this);
- m_pThreadPool = NULL;
-}
-
-void ThreadPoolWorker::WakeUp()
-{
- SetIdle(false);
- kpr::ScopedLock<kpr::Monitor> lock(*this);
- m_canWork = true;
- Notify();
-}
-
-void ThreadPoolWorker::Stop()
-{
- kpr::ScopedLock<kpr::Monitor> lock(*this);
- m_canWork = true;
- m_stop = true;
- Notify();
-}
-
-bool ThreadPoolWorker:: IsWaiting()
-{
- kpr::ScopedLock<kpr::Monitor> lock(*this);
- return m_isWaiting;
-}
-
-ThreadPool::ThreadPool(const char* name,
- int count,
- int minCount,
- int maxCount,
- int step,
- int maxIdleTime,
- int checkldleThreadsInterval)
-{
- if (name == NULL)
- {
- snprintf(m_name, sizeof(m_name), "ThreadPool");
- }
- else
- {
- snprintf(m_name, sizeof(m_name), "%s", name);
- }
-
- m_destroy = false;
- m_minCount = minCount;
- m_maxCount = maxCount;
- m_maxIdleTime = maxIdleTime;
- m_count = 0;
- m_step = step;
- m_index = 0;
-
- m_lastRemoveIdleThreadsTime = KPRUtil::GetCurrentTimeMillis();
-
- if (m_minCount <= 0)
- {
- m_minCount = MIN_THREAD_COUNT;
- }
-
- if (m_maxCount < 0)
- {
- m_maxCount = MAX_THREAD_COUNT;
- }
-
- if (m_maxIdleTime < 0)
- {
- m_maxIdleTime = MAX_IDLE_THREAD_TIME;
- }
-
- if (m_maxCount != 0 && m_maxCount < m_minCount)
- {
- m_minCount = MIN_THREAD_COUNT;
- }
-
- if ((m_maxCount != 0 && count > m_maxCount) || count < m_minCount)
- {
- count = m_minCount;
- }
-
- if (checkldleThreadsInterval < 0)
- {
- checkldleThreadsInterval = CHECK_IDLE_THREADS_INTERVAL;
- }
-
- AddThreads(count);
-
- char manager_name[32];
- snprintf(manager_name, sizeof(manager_name), "%s-manager", m_name);
- m_manager = new ThreadPoolManage(manager_name, this, checkldleThreadsInterval);
- m_manager->Start();
-}
-
-ThreadPool::~ThreadPool()
-{
- Destroy();
-}
-
-void ThreadPool::AddThreads(int count)
-{
- char threadName[256];
-
- for (int i = 0; i < count; ++i)
- {
- snprintf(threadName, sizeof(threadName), "%s-Worker%d", m_name, m_index);
-
- try
- {
- ThreadPoolWorkerPtr worker = new ThreadPoolWorker(this, threadName);
- worker->Start();
-
- m_workers.push_back(worker);
- while (!worker->IsWaiting())
- {
- kpr::Thread::Sleep(0, 100000);
- }
-
- m_index++;
- m_count++;
- }
- catch (...)
- {
- RMQ_ERROR("ThreadPool thead[%s] new exception", threadName);
- }
- }
-}
-
-void ThreadPool::Destroy()
-{
- std::list<ThreadPoolWorkerPtr> workers;
- {
- kpr::ScopedLock<kpr::Monitor> lock(*this);
- if (m_destroy)
- {
- return;
- }
-
- m_destroy = true;
-
- std::list<ThreadPoolWorkerPtr>::iterator iter;
- for (iter = m_workers.begin(); iter != m_workers.end(); iter++)
- {
- workers.push_back(*iter);
- (*iter)->Stop();
- }
- }
-
- m_manager->Stop();
- m_manager->Join();
-
- std::list<ThreadPoolWorkerPtr>::iterator itThread;
- for (itThread = workers.begin(); itThread != workers.end(); itThread++)
- {
- (*itThread)->Join();
- }
- m_works.clear();
-}
-
-int ThreadPool::AddWork(ThreadPoolWorkPtr pWork)
-{
- kpr::ScopedLock<kpr::Monitor> lock(*this);
- if (m_destroy)
- {
- return -1;
- }
-
- m_works.push_back(pWork);
-
- if (!WakeOneThread())
- {
- if (0 == m_maxCount || m_count < m_maxCount)
- {
- int step = m_step;
-
- if (0 < m_maxCount && m_count + m_step > m_maxCount)
- {
- step = m_maxCount - m_count;
- }
-
- AddThreads(step);
- WakeOneThread();
- }
- }
-
- return 0;
-}
-
-ThreadPoolWorkPtr ThreadPool::GetWork(ThreadPoolWorker* pWorker)
-{
- kpr::ScopedLock<kpr::Monitor> lock(*this);
- ThreadPoolWorkPtr result = NULL;
-
- if (!m_destroy && !m_works.empty())
- {
- result = m_works.front();
- m_works.pop_front();
- }
-
- return result;
-}
-
-bool ThreadPool::IsDestroy()
-{
- kpr::ScopedLock<kpr::Monitor> lock(*this);
- return m_destroy;
-}
-
-void ThreadPool::RemoveThread(ThreadPoolWorker* workerThread)
-{
- kpr::ScopedLock<kpr::Monitor> lock(*this);
-
- std::list<ThreadPoolWorkerPtr>::iterator it = m_workers.begin();
-
- for (; it != m_workers.end(); it++)
- {
- if ((*it) == workerThread)
- {
- m_workers.erase(it);
- m_count--;
- break;
- }
- }
-}
-
-void ThreadPool::RemoveIdleThreads()
-{
- kpr::ScopedLock<kpr::Monitor> lock(*this);
-
- if (m_maxIdleTime == 0)
- {
- return;
- }
-
- unsigned long long time = KPRUtil::GetCurrentTimeMillis();
- int interval = (int)(time - m_lastRemoveIdleThreadsTime);
- m_lastRemoveIdleThreadsTime = time;
-
- std::list<ThreadPoolWorkerPtr>::iterator it = m_workers.begin();
- int size = (int)m_workers.size();
- while (size > m_minCount && it != m_workers.end())
- {
- if ((*it)->IdleTime(interval) > m_maxIdleTime)
- {
- (*it)->Stop();
- size--;
- }
-
- it++;
- }
-}
-
-bool ThreadPool::WakeOneThread()
-{
- std::list<ThreadPoolWorkerPtr>::iterator it = m_workers.begin();
- for (; it != m_workers.end(); it++)
- {
- if ((*it)->IsIdle())
- {
- (*it)->WakeUp();
- return true;
- }
- }
-
- return false;
-}
-
-ThreadPoolManage::ThreadPoolManage(const char* name, ThreadPool* pThreadPool, int checkldleThreadsInterval)
- : kpr::Thread(name),
- m_pThreadPool(pThreadPool),
- m_stop(false),
- m_checkIdleThreadsInterval(checkldleThreadsInterval)
-{
-}
-
-ThreadPoolManage::~ThreadPoolManage()
-{
-}
-
-void ThreadPoolManage::Stop()
-{
- kpr::ScopedLock<kpr::Monitor> lock(*this);
- m_stop = true;
- Notify();
-}
-
-void ThreadPoolManage::Run()
-{
- while (!m_stop)
- {
- {
- kpr::ScopedLock<kpr::Monitor> lock(*this);
- if (!m_stop)
- {
- Wait(m_checkIdleThreadsInterval);
- }
-
- if (m_stop)
- {
- break;
- }
- }
-
- m_pThreadPool->RemoveIdleThreads();
- }
-}
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/ThreadPool.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/ThreadPool.h b/rocketmq-client4cpp/src/kpr/ThreadPool.h
deleted file mode 100755
index 2c7e3ff..0000000
--- a/rocketmq-client4cpp/src/kpr/ThreadPool.h
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __KPR_THREADPOOL_H__
-#define __KPR_THREADPOOL_H__
-
-#include<time.h>
-#include <assert.h>
-#include <list>
-#include "Mutex.h"
-#include "Thread.h"
-#include "Monitor.h"
-
-#include "ThreadPoolWork.h"
-
-namespace kpr
-{
-const int MAX_THREAD_COUNT = 300;
-const int MIN_THREAD_COUNT = 1;
-//const int MAX_IDLE_THREAD_TIME = 600000;
-const int MAX_IDLE_THREAD_TIME = 0;
-const int THREAD_STEP = 10;
-const int CHECK_IDLE_THREADS_INTERVAL = 30000;
-
-class ThreadPool;
-class ThreadPoolWorker : public kpr::Thread, public kpr::Monitor
-{
-public:
- ThreadPoolWorker(ThreadPool* pThreadPool, const char* strName);
-
- virtual void Run();
- void WakeUp();
- void Stop();
- bool IsWaiting();
- bool IsIdle();
- void SetIdle(bool idle);
- int IdleTime(int idleTime);
-
-private:
- ThreadPool* m_pThreadPool;
- bool m_canWork;
- bool m_isWaiting;
- bool m_stop;
- int m_idleTime;
- bool m_idle;
-};
-typedef kpr::RefHandleT<ThreadPoolWorker> ThreadPoolWorkerPtr;
-
-class ThreadPoolManage : public kpr::Thread, public kpr::Monitor
-{
-public:
- ThreadPoolManage(const char* name, ThreadPool* pThreadPool, int nCheckldleThreadsInterval);
-
- ~ThreadPoolManage();
- virtual void Run();
- void Stop();
-
-private:
- ThreadPool* m_pThreadPool;
- bool m_stop;
- int m_checkIdleThreadsInterval;
-};
-typedef kpr::RefHandleT<ThreadPoolManage> ThreadPoolManagePtr;
-
-
-class ThreadPool : public kpr::RefCount, public kpr::Monitor
-{
-public:
- ThreadPool(const char* name,
- int initCount,
- int minCount,
- int maxCount,
- int step = THREAD_STEP,
- int maxIdleTime = MAX_IDLE_THREAD_TIME,
- int checkldleThreadsInterval = CHECK_IDLE_THREADS_INTERVAL);
-
- ~ThreadPool();
- void Destroy();
-
- int AddWork(ThreadPoolWorkPtr pWork);
- ThreadPoolWorkPtr GetWork(ThreadPoolWorker* pWorker);
-
- void RemoveIdleThreads();
- void RemoveThread(ThreadPoolWorker* pWorker);
-
- bool WakeOneThread();
- bool IsDestroy();
-
-private:
- void AddThreads(int count);
-
-private:
- bool m_destroy;
- int m_minCount;
- int m_maxCount;
- int m_maxIdleTime;
- int m_count;
- int m_step;
-
- char m_name[128];
- unsigned int m_index;
- unsigned long long m_lastRemoveIdleThreadsTime;
-
- ThreadPoolManagePtr m_manager;
- std::list<ThreadPoolWorkPtr> m_works;
- std::list<ThreadPoolWorkerPtr> m_workers;
-};
-
-typedef kpr::RefHandleT<ThreadPool> ThreadPoolPtr;
-
-}
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h b/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h
deleted file mode 100644
index 30dfe6c..0000000
--- a/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __THREADPOOLWORK_H__
-#define __THREADPOOLWORK_H__
-
-#include "RefHandle.h"
-
-namespace kpr
-{
-
-class ThreadPoolWork : public kpr::RefCount
-{
-public:
- virtual ~ThreadPoolWork() {}
- virtual void Do() = 0;
-};
-typedef kpr::RefHandleT<ThreadPoolWork> ThreadPoolWorkPtr;
-
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp b/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp
deleted file mode 100755
index 42ef672..0000000
--- a/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "TimerTaskManager.h"
-#include "ThreadPool.h"
-#include "ScopedLock.h"
-
-namespace kpr
-{
-TimerTaskManager::TimerTaskManager()
-{
-}
-
-TimerTaskManager::~TimerTaskManager()
-{
-}
-
-int TimerTaskManager::Init(int maxThreadCount, int checklnteval)
-{
- try
- {
- m_pThreadPool = new ThreadPool("TimerThreadPool", 5, 5, maxThreadCount);
- m_timerThread = new TimerThread("TimerThread", checklnteval);
- m_timerThread->Start();
- }
- catch (...)
- {
- return -1;
- }
-
- return 0;
-}
-
-unsigned int TimerTaskManager::RegisterTimer(unsigned int initialDelay, unsigned int elapse, TimerTaskPtr pTask)
-{
- unsigned int id = m_timerThread->RegisterTimer(initialDelay, elapse, this, true);
-
- kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
- m_timerTasks[id] = pTask;
-
- return id;
-}
-
-bool TimerTaskManager::UnRegisterTimer(unsigned int timerId)
-{
- bool ret = m_timerThread->UnRegisterTimer(timerId);
-
- kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
- m_timerTasks.erase(timerId);
-
- return ret;
-}
-
-bool TimerTaskManager::ResetTimer(unsigned int timerId)
-{
- return m_timerThread->ResetTimer(timerId);
-}
-
-void TimerTaskManager::OnTimeOut(unsigned int timerId)
-{
- kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
- std::map<unsigned int, TimerTaskPtr>::iterator it = m_timerTasks.find(timerId);
- if (it != m_timerTasks.end())
- {
- if (!it->second->IsProcessing())
- {
- it->second->SetProcessing(true);
- m_pThreadPool->AddWork((it->second).ptr());
- }
- }
-}
-
-void TimerTaskManager::Stop()
-{
- m_timerThread->Stop();
- m_timerThread->Join();
- m_pThreadPool->Destroy();
-}
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/TimerTaskManager.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/TimerTaskManager.h b/rocketmq-client4cpp/src/kpr/TimerTaskManager.h
deleted file mode 100755
index b9cc2e0..0000000
--- a/rocketmq-client4cpp/src/kpr/TimerTaskManager.h
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __KPR_TIMERTASKMANAGER_H__
-#define __KPR_TIMERTASKMANAGER_H__
-
-#include <list>
-#include <map>
-
-#include "RocketMQClient.h"
-#include "TimerThread.h"
-#include "ThreadPool.h"
-#include "ThreadPoolWork.h"
-
-namespace kpr
-{
-
-class TimerTask : public kpr::ThreadPoolWork
-{
-public:
- TimerTask()
- : m_isProcessing(false)
- {
- }
-
- virtual ~TimerTask()
- {
- }
-
- virtual void Do()
- {
- try
- {
- DoTask();
- }
- catch(...)
- {
- RMQ_ERROR("TimerTask exception");
- }
- m_isProcessing = false;
- }
-
- bool IsProcessing()
- {
- return m_isProcessing;
- }
-
- void SetProcessing(bool isProcessing)
- {
- m_isProcessing = isProcessing;
- }
-
- virtual void DoTask() = 0;
-
-private:
- bool m_isProcessing;
-};
-typedef kpr::RefHandleT<TimerTask> TimerTaskPtr;
-
-
-class TimerTaskManager : public TimerHandler
-{
-public:
- TimerTaskManager();
- virtual ~TimerTaskManager();
-
- int Init(int maxThreadCount, int checklnteval);
- unsigned int RegisterTimer(unsigned int initialDelay, unsigned int elapse, TimerTaskPtr pTask);
- bool UnRegisterTimer(unsigned int timerId);
- bool ResetTimer(unsigned int timerId);
- void Stop();
-
- virtual void OnTimeOut(unsigned int timerId);
-
-private:
- std::map<unsigned int, TimerTaskPtr> m_timerTasks;
- kpr::Mutex m_mutex;
- TimerThreadPtr m_timerThread;
- kpr::ThreadPoolPtr m_pThreadPool;
-};
-
-}
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/TimerThread.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/TimerThread.cpp b/rocketmq-client4cpp/src/kpr/TimerThread.cpp
deleted file mode 100755
index b127074..0000000
--- a/rocketmq-client4cpp/src/kpr/TimerThread.cpp
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "TimerThread.h"
-#include "KPRUtil.h"
-#include "ScopedLock.h"
-
-namespace kpr
-{
-unsigned int TimerThread::s_nextTimerID = 0;
-
-TimerThread::TimerThread(const char* name, unsigned int checklnterval)
- : kpr::Thread(name), m_closed(false), m_checkInterval(checklnterval)
-{
-}
-
-TimerThread::~TimerThread()
-{
-}
-
-void TimerThread::Run()
-{
- unsigned long long lastCheckTime = KPRUtil::GetCurrentTimeMillis();
- unsigned long long currentCheckTime = lastCheckTime;
-
- while (!m_closed)
- {
- currentCheckTime = KPRUtil::GetCurrentTimeMillis();
- unsigned int elapse = (unsigned int)(currentCheckTime - lastCheckTime);
-
- std::list<TimerInfo> timeList;
-
- CheckTimeOut(elapse, timeList);
-
- if (!timeList.empty())
- {
- std::list<TimerInfo>::iterator it = timeList.begin();
- for (; it != timeList.end(); it++)
- {
- try
- {
- it->pTimerHandler->OnTimeOut(it->id);
- }
- catch(...)
- {
- RMQ_ERROR("TimerThread[%s] OnTimeOut exception", GetName());
- }
- }
- }
-
- unsigned long long checkEndTime = KPRUtil::GetCurrentTimeMillis();
- int sleepTime = m_checkInterval - (int)(checkEndTime - currentCheckTime);
- if (sleepTime < 0)
- {
- sleepTime = 0;
- }
-
- lastCheckTime = currentCheckTime;
-
- try
- {
- kpr::ScopedLock<kpr::Monitor> lock(*this);
- Wait(sleepTime);
- }
- catch (...)
- {
- }
- }
-}
-
-void TimerThread::Stop()
-{
- m_closed = true;
- kpr::ScopedLock<kpr::Monitor> lock(*this);
- Notify();
-}
-
-unsigned int TimerThread::RegisterTimer(unsigned int initialDelay, unsigned int elapse, TimerHandler* pHandler, bool persistent)
-{
- TimerInfo info;
- info.elapse = elapse;
- info.outTime = elapse - initialDelay;
- info.pTimerHandler = pHandler;
- info.persistent = persistent;
-
- kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
- info.id = GetNextTimerID();
- m_timers[info.id] = info;
-
- return info.id;
-}
-
-bool TimerThread::UnRegisterTimer(unsigned int timerId)
-{
- bool result = false;
- kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
- std::map<unsigned int, TimerInfo>::iterator it = m_timers.find(timerId);
- if (it != m_timers.end())
- {
- m_timers.erase(it);
- result = true;
- }
-
- return result;
-}
-
-bool TimerThread::ResetTimer(unsigned int timerId)
-{
- bool result = false;
- kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
- std::map<unsigned int, TimerInfo>::iterator it = m_timers.find(timerId);
- if (it != m_timers.end())
- {
- if (it->second.persistent)
- {
- it->second.outTime = it->second.elapse;
- }
- else
- {
- it->second.outTime = 0;
- }
-
- result = true;
- }
-
- return result;
-}
-
-bool TimerThread::CheckTimeOut(unsigned int elapse, std::list<TimerInfo>& timerList)
-{
- bool result = false;
- timerList.clear();
-
- kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
- if (!m_timers.empty())
- {
- std::map<unsigned int, TimerInfo>::iterator it = m_timers.begin();
- while (it != m_timers.end())
- {
- it->second.outTime += elapse;
-
- if (it->second.outTime >= int(it->second.elapse))
- {
- timerList.push_back(it->second);
-
- if (it->second.persistent)
- {
- it->second.outTime = 0;
- ++it;
- }
- else
- {
- std::map<unsigned int, TimerInfo>::iterator it1 = it;
- ++it;
- m_timers.erase(it1);
- }
- }
- else
- {
- ++it;
- }
- }
-
- result = true;
- }
-
- return result;
-}
-
-unsigned int TimerThread::GetNextTimerID()
-{
- return ++s_nextTimerID;
-}
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/kpr/TimerThread.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/TimerThread.h b/rocketmq-client4cpp/src/kpr/TimerThread.h
deleted file mode 100755
index 7e02a79..0000000
--- a/rocketmq-client4cpp/src/kpr/TimerThread.h
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __KPR_TIMERTHREAD_H__
-#define __KPR_TIMERTHREAD_H__
-
-#include <list>
-#include <map>
-
-#include "RocketMQClient.h"
-#include "Thread.h"
-#include "Mutex.h"
-#include "Monitor.h"
-
-namespace kpr
-{
-class TimerHandler
-{
-public:
- TimerHandler()
- {
- }
-
- virtual ~TimerHandler()
- {
- }
-
- virtual void OnTimeOut(unsigned int timerID) = 0;
-};
-
-typedef struct tagTimerlnfo
-{
- unsigned int id;
- unsigned int elapse;
- int outTime;
- bool persistent;
- TimerHandler* pTimerHandler;
-} TimerInfo;
-
-
-class TimerThread : public kpr::Thread, public kpr::Monitor
-{
-public:
- TimerThread(const char* name, unsigned int checklnterval);
- virtual ~TimerThread();
- virtual void Run();
- virtual void Stop();
-
- virtual unsigned int RegisterTimer(unsigned int initialDelay, unsigned int elapse, TimerHandler* pHandler, bool persistent = true);
- virtual bool UnRegisterTimer(unsigned int timerId);
- virtual bool ResetTimer(unsigned int timerId);
-
-private:
- bool CheckTimeOut(unsigned int elapse, std::list<TimerInfo>& timerList);
- static unsigned int GetNextTimerID();
-
-private:
- static unsigned int s_nextTimerID;
- std::map<unsigned int, TimerInfo> m_timers;
- kpr::Mutex m_mutex;
- bool m_closed;
- unsigned int m_checkInterval;
-};
-typedef kpr::RefHandleT<TimerThread> TimerThreadPtr;
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/message/Message.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/message/Message.cpp b/rocketmq-client4cpp/src/message/Message.cpp
deleted file mode 100755
index db88c3e..0000000
--- a/rocketmq-client4cpp/src/message/Message.cpp
+++ /dev/null
@@ -1,379 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "Message.h"
-
-#include <string.h>
-#include <stdlib.h>
-#include <stdio.h>
-#include "UtilAll.h"
-
-
-namespace rmq
-{
-
-const std::string Message::PROPERTY_KEYS = "KEYS";
-const std::string Message::PROPERTY_TAGS = "TAGS";
-const std::string Message::PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
-const std::string Message::PROPERTY_DELAY_TIME_LEVEL = "DELAY";
-const std::string Message::PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
-const std::string Message::PROPERTY_REAL_TOPIC = "REAL_TOPIC";
-const std::string Message::PROPERTY_REAL_QUEUE_ID = "REAL_QID";
-const std::string Message::PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
-const std::string Message::PROPERTY_PRODUCER_GROUP = "PGROUP";
-const std::string Message::PROPERTY_MIN_OFFSET = "MIN_OFFSET";
-const std::string Message::PROPERTY_MAX_OFFSET = "MAX_OFFSET";
-const std::string Message::PROPERTY_BUYER_ID = "BUYER_ID";
-const std::string Message::PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
-const std::string Message::PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG";
-const std::string Message::PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG";
-const std::string Message::PROPERTY_MQ2_FLAG = "MQ2_FLAG";
-const std::string Message::PROPERTY_RECONSUME_TIME = "RECONSUME_TIME";
-const std::string Message::PROPERTY_MSG_REGION = "MSG_REGION";
-const std::string Message::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
-const std::string Message::PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES";
-const std::string Message::PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME";
-const std::string Message::KEY_SEPARATOR = " ";
-
-Message::Message()
-{
- Init("", "", "", 0, NULL, 0, true);
-}
-
-Message::Message(const std::string& topic, const char* body, int len)
-{
- Init(topic, "", "", 0, body, len, true);
-}
-
-Message::Message(const std::string& topic, const std::string& tags, const char* body, int len)
-{
- Init(topic, tags, "", 0, body, len, true);
-}
-
-Message::Message(const std::string& topic, const std::string& tags, const std::string& keys, const char* body, int len)
-{
- Init(topic, tags, keys, 0, body, len, true);
-}
-
-Message::Message(const std::string& topic,
- const std::string& tags,
- const std::string& keys,
- const int flag,
- const char* body,
- int len,
- bool waitStoreMsgOK)
-{
- Init(topic, tags, keys, flag, body, len, waitStoreMsgOK);
-}
-
-Message::~Message()
-{
- if (m_body)
- {
- free(m_body);
- m_body = NULL;
- m_bodyLen = 0;
- }
-
- if (m_compressBody)
- {
- free(m_compressBody);
- m_compressBody = NULL;
- m_compressBodyLen = 0;
- }
-}
-
-Message::Message(const Message& other)
-{
- m_body = (char*)malloc(other.m_bodyLen);
- m_bodyLen = other.m_bodyLen;
- memcpy(m_body, other.m_body, other.m_bodyLen);
-
- m_compressBody = NULL;
- m_compressBodyLen = 0;
-
- m_topic = other.m_topic;
- m_flag = other.m_flag;
- m_properties = other.m_properties;
-}
-
-Message& Message::operator=(const Message& other)
-{
- if (this != &other)
- {
- if (m_body)
- {
- free(m_body);
- m_body = NULL;
- m_bodyLen = 0;
- }
-
- if (m_compressBody)
- {
- free(m_compressBody);
- m_compressBody = NULL;
- m_compressBodyLen = 0;
- }
-
- m_body = (char*)malloc(other.m_bodyLen);;
- m_bodyLen = other.m_bodyLen;
- memcpy(m_body, other.m_body, other.m_bodyLen);
-
- m_topic = other.m_topic;
- m_flag = other.m_flag;
- m_properties = other.m_properties;
- }
-
- return *this;
-}
-
-void Message::clearProperty(const std::string& name)
-{
- m_properties.erase(name);
-}
-
-void Message::putProperty(const std::string& name, const std::string& value)
-{
- m_properties[name] = value;
-}
-
-std::string Message::getProperty(const std::string& name)
-{
- std::map<std::string, std::string>::const_iterator it = m_properties.find(name);
- return (it == m_properties.end()) ? "" : it->second;
-}
-
-std::string Message::getTopic()const
-{
- return m_topic;
-}
-
-void Message::setTopic(const std::string& topic)
-{
- m_topic = topic;
-}
-
-std::string Message::getTags()
-{
- return getProperty(PROPERTY_TAGS);
-}
-
-void Message::setTags(const std::string& tags)
-{
- putProperty(PROPERTY_TAGS, tags);
-}
-
-std::string Message::getKeys()
-{
- return getProperty(PROPERTY_KEYS);
-}
-
-void Message::setKeys(const std::string& keys)
-{
- putProperty(PROPERTY_KEYS, keys);
-}
-
-void Message::setKeys(const std::list<std::string> keys)
-{
- if (keys.empty())
- {
- return;
- }
-
- std::list<std::string>::const_iterator it = keys.begin();
- std::string str;
- str += *it;
- it++;
-
- for (; it != keys.end(); it++)
- {
- str += KEY_SEPARATOR;
- str += *it;
- }
-
- setKeys(str);
-}
-
-int Message::getDelayTimeLevel()
-{
- std::string tmp = getProperty(PROPERTY_DELAY_TIME_LEVEL);
- if (!tmp.empty())
- {
- return atoi(tmp.c_str());
- }
-
- return 0;
-}
-
-void Message::setDelayTimeLevel(int level)
-{
- char tmp[16];
- snprintf(tmp, sizeof(tmp), "%d", level);
-
- putProperty(PROPERTY_DELAY_TIME_LEVEL, tmp);
-}
-
-bool Message::isWaitStoreMsgOK()
-{
- std::string tmp = getProperty(PROPERTY_WAIT_STORE_MSG_OK);
- if (tmp.empty())
- {
- return true;
- }
- else
- {
- return (tmp == "true") ? true : false;
- }
-}
-
-void Message::setWaitStoreMsgOK(bool waitStoreMsgOK)
-{
- if (waitStoreMsgOK)
- {
- putProperty(PROPERTY_WAIT_STORE_MSG_OK, "true");
- }
- else
- {
- putProperty(PROPERTY_WAIT_STORE_MSG_OK, "false");
- }
-}
-
-int Message::getFlag()
-{
- return m_flag;
-}
-
-void Message::setFlag(int flag)
-{
- m_flag = flag;
-}
-
-const char* Message::getBody()const
-{
- return m_body;
-}
-
-int Message::getBodyLen()const
-{
- return m_bodyLen;
-}
-
-void Message::setBody(const char* body, int len)
-{
- if (len > 0)
- {
- if (m_body)
- {
- free(m_body);
- m_body = NULL;
- m_bodyLen = 0;
- }
-
- m_body = (char*)malloc(len);
- m_bodyLen = len;
- memcpy(m_body, body, len);
- }
-}
-
-bool Message::tryToCompress(int compressLevel)
-{
- if (m_body != NULL)
- {
- if (m_compressBody)
- {
- free(m_compressBody);
- m_compressBody = NULL;
- m_compressBodyLen = 0;
- }
-
- unsigned char* pOut;
- int outLen = 0;
- if (UtilAll::compress(m_body, m_bodyLen, &pOut, &outLen, compressLevel))
- {
- m_compressBody = (char*)pOut;
- m_compressBodyLen = outLen;
- return true;
- }
- }
-
- return false;
-}
-
-
-const char* Message::getCompressBody() const
-{
- return m_compressBody;
-}
-
-int Message::getCompressBodyLen() const
-{
- return m_compressBodyLen;
-}
-
-
-
-std::map<std::string, std::string>& Message::getProperties()
-{
- return m_properties;
-}
-
-void Message::setProperties(const std::map<std::string, std::string>& properties)
-{
- m_properties = properties;
-}
-
-void Message::Init(const std::string& topic, const std::string& tags, const std::string& keys, const int flag, const char* body, int len, bool waitStoreMsgOK)
-{
- m_topic = topic;
- m_flag = flag;
-
- m_body = NULL;
- m_bodyLen = len;
-
- m_compressBody = NULL;
- m_compressBodyLen = 0;
-
- if (len > 0)
- {
- m_body = (char*)malloc(len);
- memcpy(m_body, body, len);
- }
-
- if (tags.length() > 0)
- {
- setTags(tags);
- }
-
- if (keys.length() > 0)
- {
- setKeys(keys);
- }
-
- setWaitStoreMsgOK(waitStoreMsgOK);
-}
-
-std::string Message::toString() const
-{
- std::stringstream ss;
- ss << "{m_topic=" << m_topic
- << ",m_flag=" << m_flag
- << ",properties=" << UtilAll::toString(m_properties)
- << ",m_bodyLen=" << m_bodyLen
- << "}";
- return ss.str();
-}
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/message/MessageDecoder.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/message/MessageDecoder.cpp b/rocketmq-client4cpp/src/message/MessageDecoder.cpp
deleted file mode 100755
index 338121e..0000000
--- a/rocketmq-client4cpp/src/message/MessageDecoder.cpp
+++ /dev/null
@@ -1,366 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "MessageDecoder.h"
-
-#include <string.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <sstream>
-#include "MessageExt.h"
-#include "MessageSysFlag.h"
-#include "UtilAll.h"
-
-namespace rmq
-{
-
-const char MessageDecoder::NAME_VALUE_SEPARATOR = 1;
-const char MessageDecoder::PROPERTY_SEPARATOR = 2;
-const int MessageDecoder::MSG_ID_LENGTH = 8 + 8;
-
-int MessageDecoder::MessageMagicCodePostion = 4;
-int MessageDecoder::MessageFlagPostion = 16;
-int MessageDecoder::MessagePhysicOffsetPostion = 28;
-int MessageDecoder::MessageStoreTimestampPostion = 56;
-
-std::string MessageDecoder::createMessageId(sockaddr& addr, long long offset)
-{
- struct sockaddr_in sa;
- memcpy(&sa, &addr, sizeof(sockaddr));
- sa.sin_family = AF_INET;
-
- int port = ntohs(sa.sin_port);
- port = htonl(port);
- int ip = sa.sin_addr.s_addr;
-
- unsigned char* buf = new unsigned char[MSG_ID_LENGTH];
- offset = h2nll(offset);
- memcpy(buf, &ip, 4);
- memcpy(buf + 4, &port, 4);
- memcpy(buf + 8, &offset, 8);
-
- char* str = new char[2 * MSG_ID_LENGTH + 1];
- memset(str, 0, 2 * MSG_ID_LENGTH + 1);
-
- for (int i = 0; i < MSG_ID_LENGTH; i++)
- {
- char tmp[3];
- tmp[2] = 0;
-
- snprintf(tmp, sizeof(tmp), "%02X", buf[i]);
- strncat(str, tmp, sizeof(tmp));
- }
-
- std::string ret = str;
-
- delete[] buf;
- delete[] str;
-
- return ret;
-}
-
-MessageId MessageDecoder::decodeMessageId(const std::string& msgId)
-{
- std::string ipstr = msgId.substr(0, 8);
- std::string portstr = msgId.substr(8, 8);
- std::string offsetstr = msgId.substr(16);
-
- char* end;
- int ipint = strtoul(ipstr.c_str(), &end, 16);
- int portint = strtoul(portstr.c_str(), &end, 16);
-
- long long offset = UtilAll::hexstr2ull(offsetstr.c_str());
-
- offset = n2hll(offset);
-
- portint = ntohl(portint);
- short port = portint;
-
- struct sockaddr_in sa;
- sa.sin_family = AF_INET;
- sa.sin_port = htons(port);
- sa.sin_addr.s_addr = ipint;
-
- sockaddr addr;
- memcpy(&addr, &sa, sizeof(sockaddr));
-
- MessageId id(addr, offset);
-
- return id;
-}
-
-MessageExt* MessageDecoder::decode(const char* pData, int len, int& offset)
-{
- return decode(pData, len, offset, true);
-}
-
-MessageExt* MessageDecoder::decode(const char* pData, int len, int& offset, bool readBody)
-{
- MessageExt* msgExt = NULL;
-
- try
- {
- msgExt = new MessageExt();
-
- // 1 TOTALSIZE
- int storeSize;
- memcpy(&storeSize, pData, 4);
- storeSize = ntohl(storeSize);
-
- msgExt->setStoreSize(storeSize);
-
- // 2 MAGICCODE sizeof(int)
-
- // 3 BODYCRC
- int bodyCRC;
- memcpy(&bodyCRC, pData + 2 * sizeof(int), 4);
- bodyCRC = ntohl(bodyCRC);
- msgExt->setBodyCRC(bodyCRC);
-
- // 4 QUEUEID
- int queueId;
- memcpy(&queueId, pData + 3 * sizeof(int), 4);
- queueId = ntohl(queueId);
- msgExt->setQueueId(queueId);
-
- // 5 FLAG
- int flag ;
-
- memcpy(&flag, pData + 4 * sizeof(int), 4);
- flag = ntohl(flag);
-
- msgExt->setFlag(flag);
-
- // 6 QUEUEOFFSET
- long long queueOffset;
- memcpy(&queueOffset, pData + 5 * sizeof(int), 8);
- queueOffset = n2hll(queueOffset);
- msgExt->setQueueOffset(queueOffset);
-
- // 7 PHYSICALOFFSET
- long long physicOffset;
-
- memcpy(&physicOffset, pData + 7 * sizeof(int), 8);
- physicOffset = n2hll(physicOffset);
- msgExt->setCommitLogOffset(physicOffset);
-
- // 8 SYSFLAG
- int sysFlag;
-
- memcpy(&sysFlag, pData + 9 * sizeof(int), 4);
- sysFlag = ntohl(sysFlag);
- msgExt->setSysFlag(sysFlag);
-
- // 9 BORNTIMESTAMP
- long long bornTimeStamp;
- memcpy(&bornTimeStamp, pData + 10 * sizeof(int), 8);
- bornTimeStamp = n2hll(bornTimeStamp);
-
- msgExt->setBornTimestamp(bornTimeStamp);
-
- // 10 BORNHOST
- int bornHost;//c0 a8 00 68 192.168.0.104 c0 a8 00 68 00 00 c4 04
- memcpy(&bornHost, pData + 12 * sizeof(int), 4);
-
- int port;
- memcpy(&port, pData + 13 * sizeof(int), 4);
- port = ntohl(port);
-
- struct sockaddr_in sa;
- sa.sin_family = AF_INET;
- sa.sin_port = htons(port);
- sa.sin_addr.s_addr = bornHost;
-
- sockaddr bornAddr;
- memcpy(&bornAddr, &sa, sizeof(sockaddr));
- msgExt->setBornHost(bornAddr);
-
- // 11 STORETIMESTAMP
- long long storeTimestamp;
- memcpy(&storeTimestamp, pData + 14 * sizeof(int), 8);
- storeTimestamp = n2hll(storeTimestamp);
- msgExt->setStoreTimestamp(storeTimestamp);
-
- // 12 STOREHOST
- int storeHost;
- memcpy(&storeHost, pData + 16 * sizeof(int), 4);
- memcpy(&port, pData + 17 * sizeof(int), 4);
- port = ntohl(port);
-
- sa.sin_family = AF_INET;
- sa.sin_port = htons(port);
- sa.sin_addr.s_addr = storeHost;
-
- sockaddr storeAddr;
- memcpy(&storeAddr, &sa, sizeof(sockaddr));
-
- msgExt->setStoreHost(storeAddr);
-
- // 13 RECONSUMETIMES
- int reconsumeTimes;
- memcpy(&reconsumeTimes, pData + 18 * sizeof(int), 4);
- reconsumeTimes = ntohl(reconsumeTimes);
- msgExt->setReconsumeTimes(reconsumeTimes);
-
- // 14 Prepared Transaction Offset
- long long preparedTransactionOffset;
- memcpy(&preparedTransactionOffset, pData + 19 * sizeof(int), 8);
- preparedTransactionOffset = n2hll(preparedTransactionOffset);
- msgExt->setPreparedTransactionOffset(preparedTransactionOffset);
-
- // 15 BODY
- int bodyLen = 0;
- memcpy(&bodyLen, pData + 21 * sizeof(int), 4);
- bodyLen = ntohl(bodyLen);
-
- if (bodyLen > 0)
- {
- if (readBody)
- {
- const char* body = pData + 22 * sizeof(int);
- int newBodyLen = bodyLen;
-
- // uncompress body
- if ((sysFlag & MessageSysFlag::CompressedFlag) == MessageSysFlag::CompressedFlag)
- {
- unsigned char* pOut;
- int outLen;
-
- if (UtilAll::decompress(body, bodyLen, &pOut, &outLen))
- {
- msgExt->setBody((char*)pOut, outLen);
- free(pOut);
- }
- else
- {
- msgExt->setBody(body, newBodyLen);
- }
- }
- else
- {
- msgExt->setBody(body, newBodyLen);
- }
- }
- else
- {
-
- }
- }
-
- // 16 TOPIC
- int topicLen = *(pData + 22 * sizeof(int) + bodyLen);
-
- char* tmp = new char[topicLen + 1];
-
- memcpy(tmp, pData + 22 * sizeof(int) + bodyLen + 1, topicLen);
- tmp[topicLen] = 0;
- std::string topic = tmp;
-
- delete[] tmp;
-
- msgExt->setTopic(topic);
-
- // 17 properties
- short propertiesLength;
- memcpy(&propertiesLength, pData + 22 * sizeof(int) + bodyLen + 1 + topicLen, 2);
- propertiesLength = ntohs(propertiesLength);
-
- if (propertiesLength > 0)
- {
- char* properties = new char[propertiesLength + 1];
- memcpy(properties, pData + 22 * sizeof(int) + bodyLen + 1 + topicLen + 2, propertiesLength);
- properties[propertiesLength] = 0;
- std::string propertiesString = properties;
- std::map<std::string, std::string> map;
- string2messageProperties(map, propertiesString);
- msgExt->setProperties(map);
- delete[] properties;
- }
-
- offset = 22 * sizeof(int) + bodyLen + 1 + topicLen + 2 + propertiesLength;
-
- // ��ϢID
- std::string msgId = createMessageId(storeAddr, physicOffset);
- msgExt->setMsgId(msgId);
-
- return msgExt;
- }
- catch (...)
- {
- RMQ_ERROR("decode exception");
- if (msgExt)
- {
- delete msgExt;
- msgExt = NULL;
- }
- }
-
- return NULL;
-}
-
-std::list<MessageExt*> MessageDecoder::decodes(const char* pData, int len)
-{
- return decodes(pData, len, true);
-}
-
-std::list<MessageExt*> MessageDecoder::decodes(const char* pData, int len, bool readBody)
-{
- std::list<MessageExt*> list;
-
- int offset = 0;
- while (offset < len)
- {
- int tmp;
- MessageExt* msg = decode(pData + offset, len, tmp);
- list.push_back(msg);
- offset += tmp;
- }
-
- return list;
-}
-
-std::string MessageDecoder::messageProperties2String(const std::map<std::string, std::string>& properties)
-{
- std::stringstream ss;
-
- std::map<std::string, std::string>::const_iterator it = properties.begin();
-
- for (; it != properties.end(); it++)
- {
- ss << it->first << NAME_VALUE_SEPARATOR << it->second << PROPERTY_SEPARATOR;
- }
-
- return ss.str();
-}
-
-void MessageDecoder::string2messageProperties(std::map<std::string, std::string>& properties,
- std::string& propertiesString)
-{
- std::vector<std::string> out;
- UtilAll::Split(out, propertiesString, PROPERTY_SEPARATOR);
-
- for (size_t i = 0; i < out.size(); i++)
- {
- std::vector<std::string> outValue;
- UtilAll::Split(outValue, out[i], NAME_VALUE_SEPARATOR);
-
- if (outValue.size() == 2)
- {
- properties[outValue[0]] = outValue[1];
- }
- }
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/message/MessageDecoder.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/message/MessageDecoder.h b/rocketmq-client4cpp/src/message/MessageDecoder.h
deleted file mode 100755
index a5f24ed..0000000
--- a/rocketmq-client4cpp/src/message/MessageDecoder.h
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __MESSAGEDECODER_H__
-#define __MESSAGEDECODER_H__
-
-#include <string>
-#include <list>
-#include <map>
-
-#include "SocketUtil.h"
-#include "MessageId.h"
-
-namespace rmq
-{
- class MessageExt;
- class UnknownHostException;
-
- /**
- * Message decoder
- *
- */
- class MessageDecoder
- {
- public:
- static std::string createMessageId(sockaddr& addr, long long offset);
- static MessageId decodeMessageId(const std::string& msgId);
-
- static MessageExt* decode(const char* pData, int len, int& offset);
- static MessageExt* decode(const char* pData, int len, int& offset, bool readBody);
-
- static std::list<MessageExt*> decodes(const char* pData, int len);
- static std::list<MessageExt*> decodes(const char* pData, int len, bool readBody);
-
- static std::string messageProperties2String(const std::map<std::string, std::string>& properties);
- static void string2messageProperties(std::map<std::string, std::string>& properties,
- std::string& propertiesString);
-
- public:
- static const char NAME_VALUE_SEPARATOR;
- static const char PROPERTY_SEPARATOR;
-
- static const int MSG_ID_LENGTH;
-
- static int MessageMagicCodePostion;
- static int MessageFlagPostion;
- static int MessagePhysicOffsetPostion;
- static int MessageStoreTimestampPostion;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/message/MessageExt.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/message/MessageExt.cpp b/rocketmq-client4cpp/src/message/MessageExt.cpp
deleted file mode 100755
index 35479ce..0000000
--- a/rocketmq-client4cpp/src/message/MessageExt.cpp
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "MessageExt.h"
-
-#include <sstream>
-#include "MessageSysFlag.h"
-#include "SocketUtil.h"
-
-namespace rmq
-{
-
-MessageExt::MessageExt()
- : m_queueOffset(0),
- m_commitLogOffset(0),
- m_bornTimestamp(0),
- m_storeTimestamp(0),
- m_preparedTransactionOffset(0),
- m_queueId(0),
- m_storeSize(0),
- m_sysFlag(0),
- m_bodyCRC(0),
- m_reconsumeTimes(3),
- m_msgId("")
-{
-}
-
-MessageExt::MessageExt(int queueId,
- long long bornTimestamp,
- sockaddr bornHost,
- long long storeTimestamp,
- sockaddr storeHost,
- std::string msgId)
- : m_queueOffset(0),
- m_commitLogOffset(0),
- m_bornTimestamp(bornTimestamp),
- m_storeTimestamp(storeTimestamp),
- m_preparedTransactionOffset(0),
- m_queueId(queueId),
- m_storeSize(0),
- m_sysFlag(0),
- m_bodyCRC(0),
- m_reconsumeTimes(3),
- m_bornHost(bornHost),
- m_storeHost(storeHost),
- m_msgId(msgId)
-{
-
-}
-
-MessageExt::~MessageExt()
-{
-
-}
-
-int MessageExt::getQueueId()
-{
- return m_queueId;
-}
-
-void MessageExt::setQueueId(int queueId)
-{
- m_queueId = queueId;
-}
-
-long long MessageExt::getBornTimestamp()
-{
- return m_bornTimestamp;
-}
-
-void MessageExt::setBornTimestamp(long long bornTimestamp)
-{
- m_bornTimestamp = bornTimestamp;
-}
-
-sockaddr MessageExt::getBornHost()
-{
- return m_bornHost;
-}
-
-std::string MessageExt::getBornHostString()
-{
- return socketAddress2String(m_bornHost);
-}
-
-std::string MessageExt::getBornHostNameString()
-{
- return getHostName(m_bornHost);
-}
-
-void MessageExt::setBornHost(const sockaddr& bornHost)
-{
- m_bornHost = bornHost;
-}
-
-long long MessageExt::getStoreTimestamp()
-{
- return m_storeTimestamp;
-}
-
-void MessageExt::setStoreTimestamp(long long storeTimestamp)
-{
- m_storeTimestamp = storeTimestamp;
-}
-
-sockaddr MessageExt::getStoreHost()
-{
- return m_storeHost;
-}
-
-std::string MessageExt::getStoreHostString()
-{
- return socketAddress2String(m_storeHost);
-}
-
-void MessageExt::setStoreHost(const sockaddr& storeHost)
-{
- m_storeHost = storeHost;
-}
-
-std::string MessageExt::getMsgId()
-{
- return m_msgId;
-}
-
-void MessageExt::setMsgId(const std::string& msgId)
-{
- m_msgId = msgId;
-}
-
-int MessageExt::getSysFlag()
-{
- return m_sysFlag;
-}
-
-void MessageExt::setSysFlag(int sysFlag)
-{
- m_sysFlag = sysFlag;
-}
-
-int MessageExt::getBodyCRC()
-{
- return m_bodyCRC;
-}
-
-void MessageExt::setBodyCRC(int bodyCRC)
-{
- m_bodyCRC = bodyCRC;
-}
-
-long long MessageExt::getQueueOffset()
-{
- return m_queueOffset;
-}
-
-void MessageExt::setQueueOffset(long long queueOffset)
-{
- m_queueOffset = queueOffset;
-}
-
-long long MessageExt::getCommitLogOffset()
-{
- return m_commitLogOffset;
-}
-
-void MessageExt::setCommitLogOffset(long long physicOffset)
-{
- m_commitLogOffset = physicOffset;
-}
-
-int MessageExt::getStoreSize()
-{
- return m_storeSize;
-}
-
-void MessageExt::setStoreSize(int storeSize)
-{
- m_storeSize = storeSize;
-}
-
-TopicFilterType MessageExt::parseTopicFilterType(int sysFlag)
-{
- if ((sysFlag & MessageSysFlag::MultiTagsFlag) == MessageSysFlag::MultiTagsFlag)
- {
- return MULTI_TAG;
- }
-
- return SINGLE_TAG;
-}
-
-int MessageExt::getReconsumeTimes()
-{
- return m_reconsumeTimes;
-}
-
-void MessageExt::setReconsumeTimes(int reconsumeTimes)
-{
- m_reconsumeTimes = reconsumeTimes;
-}
-
-long long MessageExt::getPreparedTransactionOffset()
-{
- return m_preparedTransactionOffset;
-}
-
-void MessageExt::setPreparedTransactionOffset(long long preparedTransactionOffset)
-{
- m_preparedTransactionOffset = preparedTransactionOffset;
-}
-
-std::string MessageExt::toString() const
-{
- std::stringstream ss;
- ss << "{msgId=" << m_msgId
- << ",queueId=" << m_queueId
- << ",storeSize=" << m_storeSize
- << ",sysFlag=" << m_sysFlag
- << ",queueOffset=" << m_queueOffset
- << ",commitLogOffset=" << m_commitLogOffset
- << ",preparedTransactionOffset=" << m_preparedTransactionOffset
- << ",bornTimestamp=" << m_bornTimestamp
- << ",bornHost=" << socketAddress2String(m_bornHost)
- << ",storeHost=" << socketAddress2String(m_storeHost)
- << ",storeTimestamp=" << m_storeTimestamp
- << ",reconsumeTimes=" << m_reconsumeTimes
- << ",bodyCRC=" << m_bodyCRC
- << ",Message=" << Message::toString()
- << "}";
- return ss.str();
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/message/MessageId.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/message/MessageId.h b/rocketmq-client4cpp/src/message/MessageId.h
deleted file mode 100644
index 5237f8d..0000000
--- a/rocketmq-client4cpp/src/message/MessageId.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __MESSAGEID_H__
-#define __MESSAGEID_H__
-
-#include "SocketUtil.h"
-
-namespace rmq
-{
- class MessageId
- {
- public:
- MessageId(sockaddr address, long long offset)
- : m_address(address), m_offset(offset)
- {
-
- }
-
- sockaddr getAddress()
- {
- return m_address;
- }
-
- void setAddress(sockaddr address)
- {
- m_address = address;
- }
-
- long long getOffset()
- {
- return m_offset;
- }
-
- void setOffset(long long offset)
- {
- m_offset = offset;
- }
-
- private:
- sockaddr m_address;
- long long m_offset;
- };
-}
-
-#endif