You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by vo...@apache.org on 2017/09/04 06:44:50 UTC
[16/17] incubator-rocketmq-externals git commit: Polish cpp module
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/DefaultMQPullConsumer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/DefaultMQPullConsumer.h b/rocketmq-client4cpp/include/DefaultMQPullConsumer.h
deleted file mode 100755
index d9952c5..0000000
--- a/rocketmq-client4cpp/include/DefaultMQPullConsumer.h
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __RMQ_DEFAULTMQPULLCONSUMER_H__
-#define __RMQ_DEFAULTMQPULLCONSUMER_H__
-
-#include <list>
-#include <string>
-
-#include "RocketMQClient.h"
-#include "MQClientException.h"
-#include "MessageQueue.h"
-#include "MessageExt.h"
-#include "ClientConfig.h"
-#include "MQPullConsumer.h"
-
-namespace rmq
-{
- class OffsetStore;
- class DefaultMQPullConsumerImpl;
- class AllocateMessageQueueStrategy;
-
- /**
- * Pull Consumer
- *
- */
- class DefaultMQPullConsumer : public ClientConfig , public MQPullConsumer
- {
- public:
- DefaultMQPullConsumer();
- DefaultMQPullConsumer(const std::string& consumerGroup);
- ~DefaultMQPullConsumer();
-
- //MQAdmin
- void createTopic(const std::string& key, const std::string& newTopic, int queueNum);
- long long searchOffset(const MessageQueue& mq, long long timestamp);
- long long maxOffset(const MessageQueue& mq);
- long long minOffset(const MessageQueue& mq);
- long long earliestMsgStoreTime(const MessageQueue& mq);
- MessageExt* viewMessage(const std::string& msgId);
- QueryResult queryMessage(const std::string& topic,
- const std::string& key,
- int maxNum,
- long long begin,
- long long end);
- // MQadmin end
-
- AllocateMessageQueueStrategy* getAllocateMessageQueueStrategy();
- void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy);
- int getBrokerSuspendMaxTimeMillis() ;
- void setBrokerSuspendMaxTimeMillis(int brokerSuspendMaxTimeMillis);
- std::string getConsumerGroup();
- void setConsumerGroup(const std::string& consumerGroup);
- int getConsumerPullTimeoutMillis();
- void setConsumerPullTimeoutMillis(int consumerPullTimeoutMillis);
- int getConsumerTimeoutMillisWhenSuspend() ;
- void setConsumerTimeoutMillisWhenSuspend(int consumerTimeoutMillisWhenSuspend);
- MessageModel getMessageModel();
- void setMessageModel(MessageModel messageModel);
- MessageQueueListener* getMessageQueueListener();
- void setMessageQueueListener(MessageQueueListener* pMessageQueueListener);
- std::set<std::string> getRegisterTopics();
- void setRegisterTopics( std::set<std::string> registerTopics);
-
- //MQConsumer
- void sendMessageBack(MessageExt& msg, int delayLevel);
- void sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName);
- std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic);
- void start();
- void shutdown() ;
- //MQConsumer end
-
- //MQPullConsumer
- void registerMessageQueueListener(const std::string& topic, MessageQueueListener* pListener);
- PullResult* pull(MessageQueue& mq, const std::string& subExpression, long long offset,int maxNums);
- void pull(MessageQueue& mq,
- const std::string& subExpression,
- long long offset,
- int maxNums,
- PullCallback* pPullCallback);
-
- PullResult* pullBlockIfNotFound(MessageQueue& mq,
- const std::string& subExpression,
- long long offset,
- int maxNums);
-
- void pullBlockIfNotFound(MessageQueue& mq,
- const std::string& subExpression,
- long long offset,
- int maxNums,
- PullCallback* pPullCallback);
-
- void updateConsumeOffset(MessageQueue& mq, long long offset);
-
- long long fetchConsumeOffset(MessageQueue& mq, bool fromStore);
-
- std::set<MessageQueue>* fetchMessageQueuesInBalance(const std::string& topic);
- //MQPullConsumer end
-
- OffsetStore* getOffsetStore();
- void setOffsetStore(OffsetStore* offsetStore);
-
- DefaultMQPullConsumerImpl* getDefaultMQPullConsumerImpl();
-
- bool isUnitMode();
- void setUnitMode(bool isUnitMode);
-
- int getMaxReconsumeTimes();
- void setMaxReconsumeTimes(int maxReconsumeTimes);
-
- protected:
- DefaultMQPullConsumerImpl* m_pDefaultMQPullConsumerImpl;
-
- private:
- std::string m_consumerGroup;
- int m_brokerSuspendMaxTimeMillis ;
-
- int m_consumerTimeoutMillisWhenSuspend;
- int m_consumerPullTimeoutMillis;
-
- MessageModel m_messageModel;
- MessageQueueListener* m_pMessageQueueListener;
-
- OffsetStore* m_pOffsetStore;
-
- std::set<std::string> m_registerTopics;
- AllocateMessageQueueStrategy* m_pAllocateMessageQueueStrategy;
-
- /**
- * Whether the unit of subscription group
- */
- bool m_unitMode;
-
- /**
- * max retry times��default is 15
- */
- int m_maxReconsumeTimes;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/DefaultMQPushConsumer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/DefaultMQPushConsumer.h b/rocketmq-client4cpp/include/DefaultMQPushConsumer.h
deleted file mode 100755
index 25ef4fb..0000000
--- a/rocketmq-client4cpp/include/DefaultMQPushConsumer.h
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __RMQ_DEFAULTMQPUSHCONSUMER_H__
-#define __RMQ_DEFAULTMQPUSHCONSUMER_H__
-
-#include <list>
-#include <string>
-
-#include "RocketMQClient.h"
-#include "MQClientException.h"
-#include "Message.h"
-#include "MessageExt.h"
-#include "MessageQueue.h"
-#include "MessageListener.h"
-#include "PullResult.h"
-#include "ClientConfig.h"
-#include "MQPushConsumer.h"
-
-namespace rmq
-{
- class AllocateMessageQueueStrategy;
- class DefaultMQPushConsumerImpl;
- class OffsetStore;
-
- /**
- * Push Consumer
- *
- */
- class DefaultMQPushConsumer : public ClientConfig ,public MQPushConsumer
- {
- public:
- DefaultMQPushConsumer();
- DefaultMQPushConsumer(const std::string& consumerGroup);
- ~DefaultMQPushConsumer();
-
- //MQAdmin
- void createTopic(const std::string& key, const std::string& newTopic, int queueNum);
- long long searchOffset(const MessageQueue& mq, long long timestamp);
- long long maxOffset(const MessageQueue& mq);
- long long minOffset(const MessageQueue& mq);
- long long earliestMsgStoreTime(const MessageQueue& mq);
- MessageExt* viewMessage(const std::string& msgId);
- QueryResult queryMessage(const std::string& topic,
- const std::string& key,
- int maxNum,
- long long begin,
- long long end);
-
- // MQadmin end
-
- AllocateMessageQueueStrategy* getAllocateMessageQueueStrategy();
- void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy);
-
- int getConsumeConcurrentlyMaxSpan();
- void setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan);
-
- ConsumeFromWhere getConsumeFromWhere();
- void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere);
-
- int getConsumeMessageBatchMaxSize();
- void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize);
-
- std::string getConsumerGroup();
- void setConsumerGroup(const std::string& consumerGroup) ;
-
- int getConsumeThreadMax() ;
- void setConsumeThreadMax(int consumeThreadMax);
-
- int getConsumeThreadMin();
- void setConsumeThreadMin(int consumeThreadMin);
-
- MessageListener* getMessageListener();
- void setMessageListener(MessageListener* pMessageListener);
-
- MessageModel getMessageModel();
- void setMessageModel(MessageModel messageModel) ;
-
- int getPullBatchSize() ;
- void setPullBatchSize(int pullBatchSize);
-
- long getPullInterval();
- void setPullInterval(long pullInterval);
-
- int getPullThresholdForQueue();
- void setPullThresholdForQueue(int pullThresholdForQueue);
-
- std::map<std::string, std::string>& getSubscription();
- void setSubscription(const std::map<std::string, std::string>& subscription);
-
- //MQConsumer
- void sendMessageBack(MessageExt& msg, int delayLevel);
- void sendMessageBack(MessageExt& msg, int delayLevel, const std::string brokerName);
- std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic);
-
- void start();
- void shutdown();
- //MQConsumer end
-
- //MQPushConsumer
- void registerMessageListener(MessageListener* pMessageListener);
-
- void subscribe(const std::string& topic, const std::string& subExpression);
- void unsubscribe(const std::string& topic);
-
- void updateCorePoolSize(int corePoolSize);
-
- void suspend() ;
- void resume();
- //MQPushConsumer end
-
- OffsetStore* getOffsetStore();
- void setOffsetStore(OffsetStore* offsetStore);
-
- std::string getConsumeTimestamp();
- void setConsumeTimestamp(std::string consumeTimestamp);
-
- DefaultMQPushConsumerImpl* getDefaultMQPushConsumerImpl();
-
- bool isPostSubscriptionWhenPull();
- void setPostSubscriptionWhenPull(bool postSubscriptionWhenPull);
-
- bool isUnitMode();
- void setUnitMode(bool isUnitMode);
-
- int getMaxReconsumeTimes();
- void setMaxReconsumeTimes(int maxReconsumeTimes);
-
- int getSuspendCurrentQueueTimeMillis();
- void setSuspendCurrentQueueTimeMillis(int suspendCurrentQueueTimeMillis);
-
- int getConsumeTimeout();
- void setConsumeTimeout(int consumeTimeout);
-
- protected:
- DefaultMQPushConsumerImpl* m_pDefaultMQPushConsumerImpl;
-
- private:
- std::string m_consumerGroup;
- MessageModel m_messageModel;
- ConsumeFromWhere m_consumeFromWhere;
- std::string m_consumeTimestamp;
-
- AllocateMessageQueueStrategy* m_pAllocateMessageQueueStrategy ;
- std::map<std::string /* topic */, std::string /* sub expression */> m_subscription ;
-
- MessageListener* m_pMessageListener;
- OffsetStore* m_pOffsetStore;
-
- int m_consumeThreadMin;
- int m_consumeThreadMax;
-
- int m_consumeConcurrentlyMaxSpan;
- int m_pullThresholdForQueue;
- long m_pullInterval;
-
- int m_consumeMessageBatchMaxSize;
- int m_pullBatchSize;
-
- bool m_postSubscriptionWhenPull;
- bool m_unitMode;
- int m_maxReconsumeTimes;
-
- long m_suspendCurrentQueueTimeMillis;
- long m_consumeTimeout;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MQAdmin.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQAdmin.h b/rocketmq-client4cpp/include/MQAdmin.h
deleted file mode 100755
index 552a468..0000000
--- a/rocketmq-client4cpp/include/MQAdmin.h
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __RMQ_MQADMIN_H__
-#define __RMQ_MQADMIN_H__
-
-#include <string>
-
-#include "RocketMQClient.h"
-#include "MessageExt.h"
-
-namespace rmq
-{
- class MQClientException;
- class RemotingException;
- class MQBrokerException;
- class InterruptedException;
- class MessageQueue;
- class QueryResult;
-
- /**
- * MQ Admin
- *
- */
- class MQAdmin
- {
- public:
- MQAdmin()
- {
- }
-
- virtual ~MQAdmin()
- {
- }
-
- virtual void createTopic(const std::string& key, const std::string& newTopic, int queueNum)=0;
-
- virtual long long searchOffset(const MessageQueue& mq, long long timestamp)=0;
- virtual long long maxOffset(const MessageQueue& mq)=0;
- virtual long long minOffset(const MessageQueue& mq)=0;
-
- virtual long long earliestMsgStoreTime(const MessageQueue& mq)=0;
-
- virtual MessageExt* viewMessage(const std::string& msgId)=0;
- virtual QueryResult queryMessage(const std::string& topic,
- const std::string& key,
- int maxNum,
- long long begin,
- long long end)=0;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MQClientException.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQClientException.h b/rocketmq-client4cpp/include/MQClientException.h
deleted file mode 100755
index f1d1d04..0000000
--- a/rocketmq-client4cpp/include/MQClientException.h
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __RMQ_MQCLIENTEXCEPTION_H__
-#define __RMQ_MQCLIENTEXCEPTION_H__
-
-#include <string>
-#include <ostream>
-#include <sstream>
-#include <exception>
-
-#include "RocketMQClient.h"
-
-namespace rmq
-{
- class MQException : public std::exception
- {
- public:
- MQException(const std::string& msg, int error,const char* file,int line)throw()
- : m_error(error),m_line(line),m_file(file)
- {
- try
- {
- std::stringstream ss;
- ss << "[" << file << ":" << line <<"]|error: " << error << "|msg:" << msg;
- m_msg = ss.str();
- }
- catch (...)
- {
- }
- }
-
- virtual ~MQException()throw()
- {
- }
-
- const char* what() const throw()
- {
- return m_msg.c_str();
- }
-
- int GetError() const throw()
- {
- return m_error;
- }
-
- virtual const char* GetType() const throw()
- {
- return "MQException";
- }
-
- protected:
- int m_error;
- int m_line;
- std::string m_msg;
- std::string m_file;
- };
-
- inline std::ostream& operator<<(std::ostream& os, const MQException& e)
- {
- os <<"Type:"<<e.GetType() << e.what();
- return os;
- }
-
- #define DEFINE_MQCLIENTEXCEPTION(name, parent) \
- class name : public parent \
- {\
- public:\
- name(const std::string& msg, int error,const char* file,int line) throw ()\
- : parent(msg, error, file, line) {}\
- virtual const char* GetType() const throw()\
- {\
- return #name;\
- }\
- };
-
- DEFINE_MQCLIENTEXCEPTION(MQClientException, MQException)
- DEFINE_MQCLIENTEXCEPTION(MQBrokerException, MQException)
- DEFINE_MQCLIENTEXCEPTION(InterruptedException, MQException)
- DEFINE_MQCLIENTEXCEPTION(UnknownHostException, MQException)
-
- DEFINE_MQCLIENTEXCEPTION(RemotingException, MQException)
- DEFINE_MQCLIENTEXCEPTION(RemotingCommandException, RemotingException)
- DEFINE_MQCLIENTEXCEPTION(RemotingConnectException, RemotingException)
- DEFINE_MQCLIENTEXCEPTION(RemotingSendRequestException, RemotingException)
- DEFINE_MQCLIENTEXCEPTION(RemotingTimeoutException, RemotingException)
- DEFINE_MQCLIENTEXCEPTION(RemotingTooMuchRequestException, RemotingException)
-
- #define THROW_MQEXCEPTION(e,msg,err) throw e(msg,err,__FILE__,__LINE__)
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MQConsumer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQConsumer.h b/rocketmq-client4cpp/include/MQConsumer.h
deleted file mode 100755
index 87efe97..0000000
--- a/rocketmq-client4cpp/include/MQConsumer.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __RMQ_MQCONSUMER_H__
-#define __RMQ_MQCONSUMER_H__
-
-#include <set>
-#include <string>
-
-#include "RocketMQClient.h"
-#include "MQAdmin.h"
-#include "ConsumeType.h"
-
-
-namespace rmq
-{
- class MessageExt;
-
- /**
- * Consumer interface
- *
- */
- class MQConsumer : public MQAdmin
- {
- public:
- virtual ~MQConsumer(){}
-
- virtual void start()=0;
- virtual void shutdown()=0;
-
- virtual void sendMessageBack(MessageExt& msg, int delayLevel)=0;
- virtual std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic)=0;
- };
-}
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MQProducer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQProducer.h b/rocketmq-client4cpp/include/MQProducer.h
deleted file mode 100755
index b353aba..0000000
--- a/rocketmq-client4cpp/include/MQProducer.h
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License")=0;
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __RMQ_MQPRODUCER_H__
-#define __RMQ_MQPRODUCER_H__
-
-#include <vector>
-#include <string>
-
-#include "RocketMQClient.h"
-#include "MQAdmin.h"
-#include "SendResult.h"
-
-namespace rmq
-{
- class MessageQueue;
- class SendCallback;
- class LocalTransactionExecuter;
- class MessageQueueSelector;
-
- /**
- * Producer interface
- *
- */
- class MQProducer : public MQAdmin
- {
- public:
- MQProducer()
- {
- }
-
- virtual ~MQProducer()
- {
- }
-
- virtual void start()=0;
- virtual void shutdown()=0;
-
- virtual std::vector<MessageQueue>* fetchPublishMessageQueues(const std::string& topic)=0;
-
- virtual SendResult send(Message& msg)=0;
- virtual void send(Message& msg, SendCallback* sendCallback)=0;
- virtual void sendOneway(Message& msg)=0;
-
- virtual SendResult send(Message& msg, MessageQueue& mq)=0;
- virtual void send(Message& msg, MessageQueue& mq, SendCallback* sendCallback)=0;
- virtual void sendOneway(Message& msg, MessageQueue& mq)=0;
-
- virtual SendResult send(Message& msg, MessageQueueSelector* selector, void* arg)=0;
- virtual void send(Message& msg, MessageQueueSelector* selector, void* arg, SendCallback* sendCallback)=0;
- virtual void sendOneway(Message& msg, MessageQueueSelector* selector, void* arg)=0;
-
- virtual TransactionSendResult sendMessageInTransaction(Message& msg,
- LocalTransactionExecuter* tranExecuter,
- void* arg)=0;
- };
-}
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MQPullConsumer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQPullConsumer.h b/rocketmq-client4cpp/include/MQPullConsumer.h
deleted file mode 100755
index ffb2ac5..0000000
--- a/rocketmq-client4cpp/include/MQPullConsumer.h
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __RMQ_MQPULLCONSUMER_H__
-#define __RMQ_MQPULLCONSUMER_H__
-
-#include <set>
-#include <string>
-
-#include "RocketMQClient.h"
-#include "MQConsumer.h"
-#include "PullResult.h"
-
-namespace rmq
-{
- class MessageQueueListener;
- class MessageQueue;
- class PullCallback;
-
- /**
- * Pull Consumer
- *
- */
- class MQPullConsumer : public MQConsumer
- {
- public:
- virtual ~MQPullConsumer(){}
- virtual void registerMessageQueueListener(const std::string& topic, MessageQueueListener* pListener)=0;
-
- virtual PullResult* pull(MessageQueue& mq, const std::string& subExpression, long long offset,int maxNums)=0;
- virtual void pull(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums, PullCallback* pPullCallback)=0;
-
- virtual PullResult* pullBlockIfNotFound(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums)=0;
- virtual void pullBlockIfNotFound(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums, PullCallback* pPullCallback)=0;
-
- virtual void updateConsumeOffset(MessageQueue& mq, long long offset)=0;
- virtual long long fetchConsumeOffset(MessageQueue& mq, bool fromStore)=0;
-
- virtual std::set<MessageQueue>* fetchMessageQueuesInBalance(const std::string& topic)=0;
- };
-}
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MQPushConsumer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQPushConsumer.h b/rocketmq-client4cpp/include/MQPushConsumer.h
deleted file mode 100755
index fe6d4a0..0000000
--- a/rocketmq-client4cpp/include/MQPushConsumer.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __RMQ_MQPUSHCONSUMER_H__
-#define __RMQ_MQPUSHCONSUMER_H__
-
-#include <set>
-#include <string>
-
-#include "RocketMQClient.h"
-#include "MQConsumer.h"
-#include "PullResult.h"
-
-namespace rmq
-{
- class MessageListener;
-
- /**
- * Push Consumer
- *
- */
- class MQPushConsumer : public MQConsumer
- {
- public:
- virtual void registerMessageListener(MessageListener* pMessageListener)=0;
-
-
- virtual void subscribe(const std::string& topic, const std::string& subExpression)=0;
- virtual void unsubscribe(const std::string& topic)=0;
-
-
- virtual void updateCorePoolSize(int corePoolSize)=0;
- virtual void suspend()=0;
- virtual void resume()=0;
- };
-}
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/Message.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/Message.h b/rocketmq-client4cpp/include/Message.h
deleted file mode 100755
index 441b4e5..0000000
--- a/rocketmq-client4cpp/include/Message.h
+++ /dev/null
@@ -1,136 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __RMQ_MESSAGE_H__
-#define __RMQ_MESSAGE_H__
-
-#include <map>
-#include <string>
-#include <list>
-#include "RocketMQClient.h"
-
-namespace rmq
-{
- /**
- * Message
- *
- */
- class Message
- {
- public:
- Message();
- Message(const std::string& topic, const char* body,int len);
- Message(const std::string& topic, const std::string& tags, const char* body,int len);
- Message(const std::string& topic, const std::string& tags,const std::string& keys, const char* body,int len);
- Message(const std::string& topic,
- const std::string& tags,
- const std::string& keys,
- const int flag,
- const char* body,
- int len,
- bool waitStoreMsgOK);
-
- virtual ~Message();
- Message(const Message& other);
- Message& operator=(const Message& other);
-
- void clearProperty(const std::string& name);
- void putProperty(const std::string& name, const std::string& value);
- std::string getProperty(const std::string& name);
-
- std::string getTopic()const;
- void setTopic(const std::string& topic);
-
- std::string getTags();
- void setTags(const std::string& tags);
-
- std::string getKeys();
- void setKeys(const std::string& keys);
- void setKeys(const std::list<std::string> keys);
-
- int getDelayTimeLevel();
- void setDelayTimeLevel(int level);
-
- bool isWaitStoreMsgOK();
- void setWaitStoreMsgOK(bool waitStoreMsgOK);
-
- int getFlag();
- void setFlag(int flag);
-
- const char* getBody() const;
- int getBodyLen() const;
- void setBody(const char* body, int len);
-
- bool tryToCompress(int compressLevel);
- const char* getCompressBody() const;
- int getCompressBodyLen() const;
-
- std::map<std::string, std::string>& getProperties();
- void setProperties(const std::map<std::string, std::string>& properties);
-
- std::string toString() const;
-
- protected:
- void Init(const std::string& topic,
- const std::string& tags,
- const std::string& keys,
- const int flag,
- const char* body,
- int len,
- bool waitStoreMsgOK);
-
- public:
- static const std::string PROPERTY_KEYS;
- static const std::string PROPERTY_TAGS;
- static const std::string PROPERTY_WAIT_STORE_MSG_OK;
- static const std::string PROPERTY_DELAY_TIME_LEVEL;
-
- /**
- * for inner use
- */
- static const std::string PROPERTY_RETRY_TOPIC;
- static const std::string PROPERTY_REAL_TOPIC;
- static const std::string PROPERTY_REAL_QUEUE_ID;
- static const std::string PROPERTY_TRANSACTION_PREPARED;
- static const std::string PROPERTY_PRODUCER_GROUP;
- static const std::string PROPERTY_MIN_OFFSET;
- static const std::string PROPERTY_MAX_OFFSET;
- static const std::string PROPERTY_BUYER_ID;
- static const std::string PROPERTY_ORIGIN_MESSAGE_ID;
- static const std::string PROPERTY_TRANSFER_FLAG;
- static const std::string PROPERTY_CORRECTION_FLAG;
- static const std::string PROPERTY_MQ2_FLAG;
- static const std::string PROPERTY_RECONSUME_TIME;
- static const std::string PROPERTY_MSG_REGION;
- static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX;
- static const std::string PROPERTY_MAX_RECONSUME_TIMES;
- static const std::string PROPERTY_CONSUME_START_TIMESTAMP;
-
- static const std::string KEY_SEPARATOR;
- private:
- std::string m_topic;
- int m_flag;
- std::map<std::string, std::string> m_properties;
-
- char* m_body;
- int m_bodyLen;
-
- char* m_compressBody;
- int m_compressBodyLen;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MessageExt.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageExt.h b/rocketmq-client4cpp/include/MessageExt.h
deleted file mode 100755
index f70041c..0000000
--- a/rocketmq-client4cpp/include/MessageExt.h
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __RMQ_MESSAGEEXT_H__
-#define __RMQ_MESSAGEEXT_H__
-
-#include <sys/socket.h>
-#include <string>
-#include "Message.h"
-#include "TopicFilterType.h"
-#include "RocketMQClient.h"
-
-namespace rmq
- {
- /**
- * Message extend
- *
- */
- class MessageExt : public Message
- {
- public:
- MessageExt();
-
- MessageExt(int queueId,
- long long bornTimestamp,
- sockaddr bornHost,
- long long storeTimestamp,
- sockaddr storeHost,
- std::string msgId);
-
- ~MessageExt();
-
- static TopicFilterType parseTopicFilterType(int sysFlag);
-
- int getQueueId();
- void setQueueId(int queueId);
-
- long long getBornTimestamp();
- void setBornTimestamp(long long bornTimestamp);
-
- sockaddr getBornHost();
- std::string getBornHostString();
- std::string getBornHostNameString();
- void setBornHost(const sockaddr& bornHost);
-
- long long getStoreTimestamp();
- void setStoreTimestamp(long long storeTimestamp);
-
- sockaddr getStoreHost();
- std::string getStoreHostString();
- void setStoreHost(const sockaddr& storeHost);
-
- std::string getMsgId();
- void setMsgId(const std::string& msgId);
-
- int getSysFlag();
- void setSysFlag(int sysFlag);
-
- int getBodyCRC();
- void setBodyCRC(int bodyCRC);
-
- long long getQueueOffset();
- void setQueueOffset(long long queueOffset);
-
- long long getCommitLogOffset();
- void setCommitLogOffset(long long physicOffset);
-
- int getStoreSize();
- void setStoreSize(int storeSize);
-
- int getReconsumeTimes();
- void setReconsumeTimes(int reconsumeTimes);
-
- long long getPreparedTransactionOffset();
- void setPreparedTransactionOffset(long long preparedTransactionOffset);
-
- std::string toString() const;
-
- private:
- long long m_queueOffset;
- long long m_commitLogOffset;
- long long m_bornTimestamp;
- long long m_storeTimestamp;
- long long m_preparedTransactionOffset;
- int m_queueId;
- int m_storeSize;
- int m_sysFlag;
- int m_bodyCRC;
- int m_reconsumeTimes;
- sockaddr m_bornHost;
- sockaddr m_storeHost;
- std::string m_msgId;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MessageListener.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageListener.h b/rocketmq-client4cpp/include/MessageListener.h
deleted file mode 100755
index 130a219..0000000
--- a/rocketmq-client4cpp/include/MessageListener.h
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __RMQ_MESSAGELISTENER_H__
-#define __RMQ_MESSAGELISTENER_H__
-
-#include <limits.h>
-#include <list>
-
-#include "MessageExt.h"
-#include "MessageQueue.h"
-
-namespace rmq
-{
- /**
- * Message Listener
- *
- */
- class MessageListener
- {
- public:
- virtual ~MessageListener(){}
- };
-
- enum ConsumeOrderlyStatus
- {
- SUCCESS,
- ROLLBACK,
- COMMIT,
- SUSPEND_CURRENT_QUEUE_A_MOMENT,
- };
-
- typedef struct tagConsumeOrderlyContext
- {
- tagConsumeOrderlyContext(MessageQueue& mq)
- :messageQueue(mq),
- autoCommit(true),
- suspendCurrentQueueTimeMillis(1000)
- {
- }
-
- MessageQueue messageQueue;///< Ҫ���ѵ���Ϣ�����ĸ�����
- bool autoCommit;///< ��ϢOffset�Ƿ��Զ��ύ
- long suspendCurrentQueueTimeMillis;
- }ConsumeOrderlyContext;
-
- class MessageListenerOrderly : public MessageListener
- {
- public:
- virtual ConsumeOrderlyStatus consumeMessage(std::list<MessageExt*>& msgs,
- ConsumeOrderlyContext& context)=0;
- };
-
- enum ConsumeConcurrentlyStatus
- {
- CONSUME_SUCCESS,
- RECONSUME_LATER,
- };
-
- struct ConsumeConcurrentlyContext
- {
- ConsumeConcurrentlyContext(MessageQueue& mq)
- :messageQueue(mq),
- delayLevelWhenNextConsume(0),
- ackIndex(INT_MAX)
- {
- }
- MessageQueue messageQueue;
- int delayLevelWhenNextConsume;
- int ackIndex;
- };
-
- class MessageListenerConcurrently : public MessageListener
- {
- public:
- virtual ConsumeConcurrentlyStatus consumeMessage(std::list<MessageExt*>& msgs,
- ConsumeConcurrentlyContext& context)=0;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MessageQueue.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageQueue.h b/rocketmq-client4cpp/include/MessageQueue.h
deleted file mode 100755
index 89ddf58..0000000
--- a/rocketmq-client4cpp/include/MessageQueue.h
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __RMQ_MESSAGEQUEUE_H__
-#define __RMQ_MESSAGEQUEUE_H__
-
-#include <iostream>
-#include <string>
-#include <sstream>
-
-#include "RocketMQClient.h"
-
-namespace rmq
-{
- /**
- * Message Queue
- *
- */
- class MessageQueue
- {
- public:
- MessageQueue();
- ~MessageQueue(){};
-
- MessageQueue(const std::string& topic, const std::string& brokerName, int queueId);
-
- std::string getTopic()const;
- void setTopic(const std::string& topic);
-
- std::string getBrokerName()const;
- void setBrokerName(const std::string& brokerName);
-
- int getQueueId()const;
- void setQueueId(int queueId);
-
- int hashCode();
- std::string toString() const;
- std::string toJsonString() const;
-
- bool operator==(const MessageQueue& mq) const;
- bool operator<(const MessageQueue& mq) const;
- int compareTo(const MessageQueue& mq) const;
-
- private:
- std::string m_topic;
- std::string m_brokerName;
- int m_queueId;
- };
-
- inline std::ostream& operator<<(std::ostream& os, const MessageQueue& obj)
- {
- os << obj.toString();
- return os;
- }
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/MessageQueueListener.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageQueueListener.h b/rocketmq-client4cpp/include/MessageQueueListener.h
deleted file mode 100755
index 9f04c3e..0000000
--- a/rocketmq-client4cpp/include/MessageQueueListener.h
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __RMQ_MESSAGEQUEUELISTENER_H__
-#define __RMQ_MESSAGEQUEUELISTENER_H__
-
-#include <set>
-#include "RocketMQClient.h"
-
-namespace rmq
-{
- /**
- * Message Queue Listener
- *
- */
- class MessageQueueListener
- {
- public:
- virtual ~MessageQueueListener() {}
- virtual void messageQueueChanged(const std::string& topic,
- std::set<MessageQueue>& mqAll,
- std::set<MessageQueue>& mqDivided)=0;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/OffsetStore.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/OffsetStore.h b/rocketmq-client4cpp/include/OffsetStore.h
deleted file mode 100755
index a533750..0000000
--- a/rocketmq-client4cpp/include/OffsetStore.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __RMQ_OFFSETSTORE_H__
-#define __RMQ_OFFSETSTORE_H__
-
-#include <set>
-#include <map>
-
-#include "RocketMQClient.h"
-
-namespace rmq
-{
- class MessageQueue;
-
- enum ReadOffsetType
- {
- READ_FROM_MEMORY,
- READ_FROM_STORE,
- MEMORY_FIRST_THEN_STORE,
- };
-
- /**
- * Consumer Offset Store
- *
- */
- class OffsetStore
- {
- public:
- virtual ~OffsetStore() {}
-
- virtual void load()=0;
-
- virtual void updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly)=0;
- virtual long long readOffset(const MessageQueue& mq, ReadOffsetType type)=0;
-
- virtual void persistAll(std::set<MessageQueue>& mqs)=0;
- virtual void persist(const MessageQueue& mq)=0;
-
- virtual void removeOffset(const MessageQueue& mq)=0;
-
- virtual std::map<MessageQueue, long long> cloneOffsetTable(const std::string& topic) = 0;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/PullCallback.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/PullCallback.h b/rocketmq-client4cpp/include/PullCallback.h
deleted file mode 100755
index 47ade68..0000000
--- a/rocketmq-client4cpp/include/PullCallback.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __RMQ_PULLCALLBACK_H__
-#define __RMQ_PULLCALLBACK_H__
-
-#include "RocketMQClient.h"
-#include "PullResult.h"
-
-namespace rmq
-{
- class MQException;
-
- /**
- * PullCallback
- *
- */
- class PullCallback
- {
- public:
- virtual ~PullCallback() {}
- virtual void onSuccess(PullResult& pullResult)=0;
- virtual void onException(MQException& e)=0;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/PullResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/PullResult.h b/rocketmq-client4cpp/include/PullResult.h
deleted file mode 100755
index 42c13ca..0000000
--- a/rocketmq-client4cpp/include/PullResult.h
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __RMQ_PULLRESULT_H__
-#define __RMQ_PULLRESULT_H__
-
-#include <list>
-#include <string>
-#include <sstream>
-
-#include "RocketMQClient.h"
-#include "MessageExt.h"
-
-namespace rmq
-{
- enum PullStatus
- {
- FOUND,
- NO_NEW_MSG,
- NO_MATCHED_MSG,
- OFFSET_ILLEGAL
- };
-
- /**
- * PullResult
- *
- */
- struct PullResult
- {
- PullResult()
- {
-
- }
-
- PullResult(PullStatus pullStatus,
- long long nextBeginOffset,
- long long minOffset,
- long long maxOffset,
- std::list<MessageExt*>& msgFoundList)
- :pullStatus(pullStatus),
- nextBeginOffset(nextBeginOffset),
- minOffset(minOffset),
- maxOffset(maxOffset),
- msgFoundList(msgFoundList)
- {
-
- }
-
- ~PullResult()
- {
- std::list<MessageExt*>::iterator it = msgFoundList.begin();
-
- for (;it!=msgFoundList.end();it++)
- {
- delete *it;
- }
- }
-
- std::string toString() const
- {
- std::stringstream ss;
- ss << "{pullStatus=" << pullStatus
- << ",nextBeginOffset=" << nextBeginOffset
- << ",minOffset=" << nextBeginOffset
- << ",maxOffset=" << nextBeginOffset
- << ",msgFoundList.size=" << msgFoundList.size()
- <<"}";
- return ss.str();
- }
-
- PullStatus pullStatus;
- long long nextBeginOffset;
- long long minOffset;
- long long maxOffset;
- std::list<MessageExt*> msgFoundList;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/QueryResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/QueryResult.h b/rocketmq-client4cpp/include/QueryResult.h
deleted file mode 100644
index 13164e4..0000000
--- a/rocketmq-client4cpp/include/QueryResult.h
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __RMQ_QUERYRESULT_H__
-#define __RMQ_QUERYRESULT_H__
-
-#include <list>
-
-#include "RocketMQClient.h"
-#include "MessageExt.h"
-
-namespace rmq
-{
- /**
- * QueryResult
- *
- */
- class QueryResult
- {
- public:
- QueryResult(long long indexLastUpdateTimestamp, const std::list<MessageExt*>& messageList)
- {
- m_indexLastUpdateTimestamp = indexLastUpdateTimestamp;
- m_messageList = messageList;
- }
-
- long long getIndexLastUpdateTimestamp()
- {
- return m_indexLastUpdateTimestamp;
- }
-
- std::list<MessageExt*>& getMessageList()
- {
- return m_messageList;
- }
-
- private:
- long long m_indexLastUpdateTimestamp;
- std::list<MessageExt*> m_messageList;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/RocketMQClient.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/RocketMQClient.h b/rocketmq-client4cpp/include/RocketMQClient.h
deleted file mode 100755
index e4c71c9..0000000
--- a/rocketmq-client4cpp/include/RocketMQClient.h
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __RMQ_ROCKETMQCLIENT_H__
-#define __RMQ_ROCKETMQCLIENT_H__
-
-#include <stdlib.h>
-#include <stdio.h>
-#include <stdint.h>
-#include <string.h>
-#include <assert.h>
-#include <time.h>
-#include <stdarg.h>
-#include <fcntl.h>
-#include <errno.h>
-#include <signal.h>
-#include <pthread.h>
-
-#include <sys/time.h>
-#include <sys/timeb.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <sys/file.h>
-#include <sys/syscall.h>
-#include <linux/unistd.h>
-
-#include <cstdio>
-#include <iostream>
-#include <string>
-#include <sstream>
-#include <vector>
-#include <map>
-#include <set>
-
-
-class RocketMQUtil
-{
-public:
- enum
- {
- NONE_LOG = 0,
- ERROR_LOG = 1,
- WARN_LOG = 2,
- INFO_LOG = 3,
- DEBUG_LOG = 4,
- };
-
-public:
- static pid_t getPid();
- static pid_t getTid();
-
- static int getDiffDays(time_t tmFirst, time_t tmSecond);
- static std::string tm2str(const time_t &t, const std::string &sFormat);
- static std::string now2str(const std::string &sFormat);
- static std::string now2str();
- static int64_t getNowMs();
- static std::string str2fmt(const char* format, ...)__attribute__((format(__printf__,1,2)));
-
- static int initLog(const std::string& sLogPath);
- static void setLogLevel(int logLevel);
- static void writeLog(const char* fmt, ...) __attribute__((format(__printf__,1,2)));
- static inline bool isNeedLog(int level)
- {
- return (level <= _logLevel);
- };
-
-public:
- static volatile int _logFd;
- static int _logLevel;
- static std::string _logPath;
-};
-
-#define RMQ_AUTO(name, value) typeof(value) name = value
-#define RMQ_FOR_EACH(container, it) \
- for(typeof((container).begin()) it = (container).begin();it!=(container).end(); ++it)
-
-
-
-#define RMQ_DEBUG(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::DEBUG_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][DEBUG]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__,__LINE__, ##args);}while(0)
-#define RMQ_INFO(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::INFO_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][INFO]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0)
-#define RMQ_WARN(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::WARN_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][WARN]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0)
-#define RMQ_ERROR(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::ERROR_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][ERROR]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0)
-
-#define RMQ_PRINT(fmt, args...) do{ printf("%d|[%s][%s:%s:%d][DEBUG]|"fmt"\n", RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__,__LINE__, ##args);}while(0)
-
-
-#endif
-
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/SendCallback.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/SendCallback.h b/rocketmq-client4cpp/include/SendCallback.h
deleted file mode 100755
index 0feb5a1..0000000
--- a/rocketmq-client4cpp/include/SendCallback.h
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __RMQ_SENDCALLBACK_H__
-#define __RMQ_SENDCALLBACK_H__
-
-#include "SendResult.h"
-#include "RocketMQClient.h"
-
-namespace rmq
-{
- class MQException;
-
- /**
- * Send Mesage Callback
- *
- */
- class SendCallback
- {
- public:
- virtual ~SendCallback() {}
- virtual void onSuccess(SendResult& sendResult)=0;
- virtual void onException(MQException& e)=0;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/SendMessageHook.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/SendMessageHook.h b/rocketmq-client4cpp/include/SendMessageHook.h
deleted file mode 100644
index 9869aa6..0000000
--- a/rocketmq-client4cpp/include/SendMessageHook.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __RMQ_SENDMESSAGEHOOK_H__
-#define __RMQ_SENDMESSAGEHOOK_H__
-
-#include <string>
-
-#include "RocketMQClient.h"
-#include "Message.h"
-#include "MQClientException.h"
-
-namespace rmq
-{
- class SendMessageContext
- {
- public:
- std::string producerGroup;
- Message msg;
- MessageQueue mq;
- std::string brokerAddr;
- CommunicationMode communicationMode;
- SendResult sendResult;
- MQException* pException;
- void* pArg;
- };
-
- class SendMessageHook
- {
- public:
- virtual ~SendMessageHook() {}
- virtual std::string hookName()=0;
- virtual void sendMessageBefore(const SendMessageContext& context)=0;
- virtual void sendMessageAfter(const SendMessageContext& context)=0;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/SendResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/SendResult.h b/rocketmq-client4cpp/include/SendResult.h
deleted file mode 100755
index d6a3174..0000000
--- a/rocketmq-client4cpp/include/SendResult.h
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __RMQ_SENDRESULT_H__
-#define __RMQ_SENDRESULT_H__
-
-#include "RocketMQClient.h"
-#include "MessageQueue.h"
-
-namespace rmq
-{
- enum SendStatus
- {
- SEND_OK,
- FLUSH_DISK_TIMEOUT,
- FLUSH_SLAVE_TIMEOUT,
- SLAVE_NOT_AVAILABLE
- };
-
- /**
- * Send Message Result
- *
- */
- class SendResult
- {
- public:
- SendResult();
- SendResult(const SendStatus& sendStatus,
- const std::string& msgId,
- MessageQueue& messageQueue,
- long long queueOffset,
- std::string& projectGroupPrefix);
-
- const std::string& getMsgId();
- void setMsgId(const std::string& msgId);
- SendStatus getSendStatus();
- void setSendStatus(const SendStatus& sendStatus);
- MessageQueue& getMessageQueue();
- void setMessageQueue(MessageQueue& messageQueue);
- long long getQueueOffset();
- void setQueueOffset(long long queueOffset);
- bool hasResult();
-
- std::string toString() const;
- std::string toJsonString() const;
-
- private:
- SendStatus m_sendStatus;
- std::string m_msgId;
- MessageQueue m_messageQueue;
- long long m_queueOffset;
- };
-
- enum LocalTransactionState
- {
- COMMIT_MESSAGE,
- ROLLBACK_MESSAGE,
- UNKNOW,
- };
-
- /**
- * Send transaction message result
- *
- */
- class TransactionSendResult : public SendResult
- {
- public:
- TransactionSendResult();
- LocalTransactionState getLocalTransactionState();
- void setLocalTransactionState(LocalTransactionState localTransactionState);
-
- private:
- LocalTransactionState m_localTransactionState;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/include/TopicFilterType.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/TopicFilterType.h b/rocketmq-client4cpp/include/TopicFilterType.h
deleted file mode 100755
index e51ae20..0000000
--- a/rocketmq-client4cpp/include/TopicFilterType.h
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __RMQ_TOPICFILTERTYPE_H__
-#define __RMQ_TOPICFILTERTYPE_H__
-
-namespace rmq
-{
- /**
- * Topic filter type
- *
- */
- enum TopicFilterType
- {
- SINGLE_TAG,
- MULTI_TAG
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/rocketmq.mk
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/rocketmq.mk b/rocketmq-client4cpp/rocketmq.mk
deleted file mode 100644
index eecc458..0000000
--- a/rocketmq-client4cpp/rocketmq.mk
+++ /dev/null
@@ -1,6 +0,0 @@
-ROCKETMQ_PATH := /data/libs/rocketmq
-
-INCLUDE += -I$(ROCKETMQ_PATH)/include
-INCLUDE_32 += -I$(ROCKETMQ_PATH)/include -march=i686
-LIB_32 += -L$(ROCKETMQ_PATH)/lib32 -lrocketmq -lz -lrt -lpthread
-LIB_64 += -L$(ROCKETMQ_PATH)/lib64 -lrocketmq -lz -lrt -lpthread
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/ClientConfig.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/ClientConfig.cpp b/rocketmq-client4cpp/src/ClientConfig.cpp
deleted file mode 100755
index 986d67d..0000000
--- a/rocketmq-client4cpp/src/ClientConfig.cpp
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Copyright (C) 2010-2013 kangliqiang, kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include <stdlib.h>
-#include <sstream>
-
-#include "MQClientException.h"
-#include "SocketUtil.h"
-#include "ClientConfig.h"
-#include "UtilAll.h"
-#include "MixAll.h"
-
-namespace rmq
-{
-
-ClientConfig::ClientConfig()
-{
- char* addr = getenv(MixAll::NAMESRV_ADDR_ENV.c_str());
- if (addr)
- {
- m_namesrvAddr = addr;
- }
- else
- {
- m_namesrvAddr = "";
- }
-
- m_clientIP = getLocalAddress();
- m_instanceName = "DEFAULT";
- m_clientCallbackExecutorThreads = UtilAll::availableProcessors();
- m_pollNameServerInterval = 1000 * 30;
- m_heartbeatBrokerInterval = 1000 * 30;
- m_persistConsumerOffsetInterval = 1000 * 5;
-}
-
-ClientConfig::~ClientConfig()
-{
-}
-
-std::string ClientConfig::buildMQClientId()
-{
- return m_clientIP + "@" + m_instanceName;
-}
-
-void ClientConfig::changeInstanceNameToPID()
-{
- if (m_instanceName == "DEFAULT")
- {
- m_instanceName = UtilAll::toString(UtilAll::getPid());
- }
-}
-
-
-void ClientConfig::resetClientConfig(const ClientConfig& cc)
-{
- m_namesrvAddr = cc.m_namesrvAddr;
- m_clientIP = cc.m_clientIP;
- m_instanceName = cc.m_instanceName;
- m_clientCallbackExecutorThreads = cc.m_clientCallbackExecutorThreads;
- m_pollNameServerInterval = cc.m_pollNameServerInterval;
- m_heartbeatBrokerInterval = cc.m_heartbeatBrokerInterval;
- m_persistConsumerOffsetInterval = cc.m_persistConsumerOffsetInterval;
-}
-
-ClientConfig ClientConfig::cloneClientConfig()
-{
- return *this;
-}
-
-std::string ClientConfig::getNamesrvAddr()
-{
- return m_namesrvAddr;
-}
-
-void ClientConfig::setNamesrvAddr(const std::string& namesrvAddr)
-{
- m_namesrvAddr = namesrvAddr;
-}
-
-std::string ClientConfig::getClientIP()
-{
- return m_clientIP;
-}
-
-void ClientConfig::setClientIP(const std::string& clientIP)
-{
- m_clientIP = clientIP;
-}
-
-std::string ClientConfig::getInstanceName()
-{
- return m_instanceName;
-}
-
-void ClientConfig::setInstanceName(const std::string& instanceName)
-{
- m_instanceName = instanceName;
-}
-
-int ClientConfig::getClientCallbackExecutorThreads()
-{
- return m_clientCallbackExecutorThreads;
-}
-
-void ClientConfig::setClientCallbackExecutorThreads(int clientCallbackExecutorThreads)
-{
- m_clientCallbackExecutorThreads = clientCallbackExecutorThreads;
-}
-
-int ClientConfig::getPollNameServerInterval()
-{
- return m_pollNameServerInterval;
-}
-
-void ClientConfig::setPollNameServerInterval(int pollNameServerInterval)
-{
- m_pollNameServerInterval = pollNameServerInterval;
-}
-
-int ClientConfig::getHeartbeatBrokerInterval()
-{
- return m_heartbeatBrokerInterval;
-}
-
-void ClientConfig::setHeartbeatBrokerInterval(int heartbeatBrokerInterval)
-{
- m_heartbeatBrokerInterval = heartbeatBrokerInterval;
-}
-
-int ClientConfig:: getPersistConsumerOffsetInterval()
-{
- return m_persistConsumerOffsetInterval;
-}
-
-void ClientConfig::setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval)
-{
- m_persistConsumerOffsetInterval = persistConsumerOffsetInterval;
-}
-
-
-std::string ClientConfig::toString() const
-{
- std::stringstream ss;
- ss << "{namesrvAddr=" << m_namesrvAddr
- << ",clientIP=" << m_clientIP
- << ",instanceName=" << m_instanceName
- << ",clientCallbackExecutorThreads=" << m_clientCallbackExecutorThreads
- << ",pollNameServerInteval=" << m_pollNameServerInterval
- << ",heartbeatBrokerInterval=" << m_heartbeatBrokerInterval
- << ",persistConsumerOffsetInterval=" << m_persistConsumerOffsetInterval
- <<"}";
- return ss.str();
-}
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp b/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp
deleted file mode 100755
index ae88de5..0000000
--- a/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#include "ClientRemotingProcessor.h"
-#include "MQProtos.h"
-#include "TcpTransport.h"
-#include "RemotingCommand.h"
-#include "MQClientFactory.h"
-#include "CommandCustomHeader.h"
-#include "ConsumerRunningInfo.h"
-
-
-
-namespace rmq
-{
-
-ClientRemotingProcessor::ClientRemotingProcessor(MQClientFactory* pMQClientFactory)
- : m_pMQClientFactory(pMQClientFactory)
-{
-
-}
-
-RemotingCommand* ClientRemotingProcessor::processRequest(TcpTransport* pTts, RemotingCommand* pRequest)
-{
- int code = pRequest->getCode();
- switch (code)
- {
- case CHECK_TRANSACTION_STATE_VALUE:
- return checkTransactionState(pTts, pRequest);
- case NOTIFY_CONSUMER_IDS_CHANGED_VALUE:
- return notifyConsumerIdsChanged(pTts, pRequest);
- case RESET_CONSUMER_CLIENT_OFFSET_VALUE:
- return resetOffset(pTts, pRequest);
- case GET_CONSUMER_STATUS_FROM_CLIENT_VALUE:
- return getConsumeStatus(pTts, pRequest);
- case GET_CONSUMER_RUNNING_INFO_VALUE:
- return getConsumerRunningInfo(pTts, pRequest);
- case CONSUME_MESSAGE_DIRECTLY_VALUE:
- return consumeMessageDirectly(pTts, pRequest);
- default:
- break;
- }
-
- return NULL;
-}
-
-RemotingCommand* ClientRemotingProcessor::checkTransactionState(TcpTransport* pTts, RemotingCommand* pRequest)
-{
- //TODO
- return NULL;
-}
-
-RemotingCommand* ClientRemotingProcessor::notifyConsumerIdsChanged(TcpTransport* pTts, RemotingCommand* pRequest)
-{
- try
- {
- NotifyConsumerIdsChangedRequestHeader* extHeader = (NotifyConsumerIdsChangedRequestHeader*)pRequest->getCommandCustomHeader();
- RMQ_INFO("receive broker's notification[{%s}], the consumer group: {%s} changed, rebalance immediately",
- pTts->getServerAddr().c_str(),
- extHeader->consumerGroup.c_str());
- m_pMQClientFactory->rebalanceImmediately();
- }
- catch (std::exception& e)
- {
- RMQ_ERROR("notifyConsumerIdsChanged exception: %s", e.what());
- }
-
- return NULL;
-}
-
-RemotingCommand* ClientRemotingProcessor::resetOffset(TcpTransport* pTts, RemotingCommand* pRequest)
-{
- //TODO
- return NULL;
-}
-
-
-RemotingCommand* ClientRemotingProcessor::getConsumeStatus(TcpTransport* pTts, RemotingCommand* pRequest)
-{
- //TODO
- return NULL;
-}
-
-
-RemotingCommand* ClientRemotingProcessor::getConsumerRunningInfo(TcpTransport* pTts, RemotingCommand* pRequest)
-{
- return NULL;
-
- /*
- GetConsumerRunningInfoRequestHeader* requestHeader = (GetConsumerRunningInfoRequestHeader)pRequest->getCommandCustomHeader();
- RemotingCommand* pResponse = RemotingCommand::createResponseCommand(NULL);
-
- pResponse = RemotingCommand::createResponseCommand(
- REQUEST_CODE_NOT_SUPPORTED_VALUE, "request type not supported", NULL);
- pResponse->setOpaque(pCmd->getOpaque());
-
- ConsumerRunningInfo* consumerRunningInfo = m_pMQClientFactory->consumerRunningInfo(requestHeader->consumerGroup);
- if (NULL != consumerRunningInfo) {
- response.setCode(ResponseCode.SUCCESS);
- response.setBody(consumerRunningInfo.encode());
- } else {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer",
- requestHeader.getConsumerGroup()));
- }
- return pResponse;
-
- // java
- final RemotingCommand response = RemotingCommand.createResponseCommand(null);
- final GetConsumerRunningInfoRequestHeader requestHeader =
- (GetConsumerRunningInfoRequestHeader) request
- .decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
-
- ConsumerRunningInfo consumerRunningInfo =
- this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());
- if (null != consumerRunningInfo) {
- if (requestHeader.isJstackEnable()) {
- String jstack = UtilAll.jstack();
- consumerRunningInfo.setJstack(jstack);
- }
-
- response.setCode(ResponseCode.SUCCESS);
- response.setBody(consumerRunningInfo.encode());
- } else {
- response.setCode(ResponseCode.SYSTEM_ERROR);
- response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer",
- requestHeader.getConsumerGroup()));
- }
-
- return response;
- */
-}
-
-
-RemotingCommand* ClientRemotingProcessor::consumeMessageDirectly(TcpTransport* pTts, RemotingCommand* pRequest)
-{
- //TODO
- return NULL;
-}
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/ClientRemotingProcessor.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/ClientRemotingProcessor.h b/rocketmq-client4cpp/src/ClientRemotingProcessor.h
deleted file mode 100755
index 4cd2873..0000000
--- a/rocketmq-client4cpp/src/ClientRemotingProcessor.h
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __CLIENTREMOTINGPROCESSOR_H__
-#define __CLIENTREMOTINGPROCESSOR_H__
-
-#include "TcpRequestProcessor.h"
-
-namespace rmq
-{
- class MQClientFactory;
- class RemotingCommand;
-
- class ClientRemotingProcessor : public TcpRequestProcessor
- {
- public:
- ClientRemotingProcessor(MQClientFactory* pMQClientFactory);
-
- RemotingCommand* processRequest(TcpTransport* pTts, RemotingCommand* pRequest);
- RemotingCommand* checkTransactionState(TcpTransport* pTts, RemotingCommand* pRequest);
- RemotingCommand* notifyConsumerIdsChanged(TcpTransport* pTts, RemotingCommand* pRequest);
- RemotingCommand* resetOffset(TcpTransport* pTts, RemotingCommand* pRequest);
- RemotingCommand* getConsumeStatus(TcpTransport* pTts, RemotingCommand* pRequest);
- RemotingCommand* getConsumerRunningInfo(TcpTransport* pTts, RemotingCommand* pRequest);
- RemotingCommand* consumeMessageDirectly(TcpTransport* pTts, RemotingCommand* pRequest);
-
- private:
- MQClientFactory* m_pMQClientFactory;
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/CommunicationMode.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/CommunicationMode.h b/rocketmq-client4cpp/src/CommunicationMode.h
deleted file mode 100755
index 43b2941..0000000
--- a/rocketmq-client4cpp/src/CommunicationMode.h
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Copyright (C) 2013 kangliqiang ,kangliq@163.com
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __COMMUNICATIONMODE_H__
-#define __COMMUNICATIONMODE_H__
-
-namespace rmq
-{
- /**
- * Communication Mode
- *
- */
- enum CommunicationMode
- {
- SYNC,
- ASYNC,
- ONEWAY
- };
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/FindBrokerResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/FindBrokerResult.h b/rocketmq-client4cpp/src/FindBrokerResult.h
deleted file mode 100644
index 51a9845..0000000
--- a/rocketmq-client4cpp/src/FindBrokerResult.h
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-#ifndef __FINDBROKERRESULT_H__
-#define __FINDBROKERRESULT_H__
-
-namespace rmq
-{
- typedef struct
- {
- std::string brokerAddr;
- bool slave;
- } FindBrokerResult;
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/MQAdminImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQAdminImpl.cpp b/rocketmq-client4cpp/src/MQAdminImpl.cpp
deleted file mode 100755
index 2a6b597..0000000
--- a/rocketmq-client4cpp/src/MQAdminImpl.cpp
+++ /dev/null
@@ -1,295 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#include <list>
-#include "SocketUtil.h"
-#include "MQAdminImpl.h"
-#include "MQClientFactory.h"
-#include "MQClientAPIImpl.h"
-#include "MQClientException.h"
-#include "TopicConfig.h"
-#include "TopicPublishInfo.h"
-#include "MessageId.h"
-#include "MessageDecoder.h"
-
-namespace rmq
-{
-
-
-MQAdminImpl::MQAdminImpl(MQClientFactory* pMQClientFactory)
-{
- m_pMQClientFactory = pMQClientFactory;
-}
-
-MQAdminImpl::~MQAdminImpl()
-{
-
-}
-
-void MQAdminImpl::createTopic(const std::string& key, const std::string& newTopic,
- int queueNum)
-{
- return createTopic(key, newTopic, queueNum, 0);
-}
-
-
-void MQAdminImpl::createTopic(const std::string& key, const std::string& newTopic,
- int queueNum, int topicSysFlag)
-{
- try
- {
- MQClientAPIImpl* api = m_pMQClientFactory->getMQClientAPIImpl();
- TopicRouteDataPtr topicRouteData = api->getTopicRouteInfoFromNameServer(key, 1000 * 3);
-
- std::list<BrokerData> brokerDataList = topicRouteData->getBrokerDatas();
- if (!brokerDataList.empty())
- {
- brokerDataList.sort();
-
- MQClientException exception("", 0, "", 0);
- bool hasException = false;
-
- std::list<BrokerData>::iterator it = brokerDataList.begin();
-
- for (; it != brokerDataList.end(); it++)
- {
- std::map<int, std::string>::iterator it1 = (*it).brokerAddrs.find(MixAll::MASTER_ID);
- if (it1 != (*it).brokerAddrs.end())
- {
- std::string addr = it1->second;
-
- TopicConfig topicConfig(newTopic);
- topicConfig.setReadQueueNums(queueNum);
- topicConfig.setWriteQueueNums(queueNum);
- topicConfig.setTopicSysFlag(topicSysFlag);
-
- try
- {
- api->createTopic(addr, key, topicConfig, 1000 * 3);
- }
- catch (MQClientException& e)
- {
- hasException = true;
- exception = e;
- }
- }
- }
-
- if (hasException)
- {
- throw exception;
- }
- }
- else
- {
- THROW_MQEXCEPTION(MQClientException, "Not found broker, maybe key is wrong", -1);
- }
- }
- catch (MQClientException e)
- {
- THROW_MQEXCEPTION(MQClientException, "create new topic failed", -1);
- }
-}
-
-std::vector<MessageQueue>* MQAdminImpl::fetchPublishMessageQueues(const std::string& topic)
-{
- try
- {
- MQClientAPIImpl* api = m_pMQClientFactory->getMQClientAPIImpl();
- TopicRouteDataPtr topicRouteData = api->getTopicRouteInfoFromNameServer(topic, 1000 * 3);
-
- if (topicRouteData.ptr() != NULL)
- {
- TopicPublishInfoPtr topicPublishInfo =
- MQClientFactory::topicRouteData2TopicPublishInfo(topic, *topicRouteData);
- if (topicPublishInfo.ptr() != NULL && topicPublishInfo->ok())
- {
- std::vector<MessageQueue>* ret = new std::vector<MessageQueue>();
- (*ret) = topicPublishInfo->getMessageQueueList();
-
- /*
- std::vector<MessageQueue>& mqs = ;
- std::vector<MessageQueue>::iterator it = mqs.begin();
- for (; it != mqs.end(); it++)
- {
- ret->push_back(*it);
- }
- */
-
- return ret;
- }
- }
- }
- catch (MQClientException e)
- {
- THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for this topic" + topic, -1);
- }
-
- THROW_MQEXCEPTION(MQClientException, "Unknow why, Can not find Message Queue for this topic, " + topic, -1);
-}
-
-std::set<MessageQueue>* MQAdminImpl::fetchSubscribeMessageQueues(const std::string& topic)
-{
- try
- {
- TopicRouteDataPtr topicRouteData =
- m_pMQClientFactory->getMQClientAPIImpl()->getTopicRouteInfoFromNameServer(topic, 1000 * 3);
- if (topicRouteData.ptr() != NULL)
- {
- std::set<MessageQueue>* mqList =
- MQClientFactory::topicRouteData2TopicSubscribeInfo(topic, *topicRouteData);
- if (!mqList->empty())
- {
- return mqList;
- }
- else
- {
- THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for this topic" + topic, -1);
- }
- }
- }
- catch (MQClientException e)
- {
- THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for this topic" + topic, -1);
- }
-
- THROW_MQEXCEPTION(MQClientException, "Unknow why, Can not find Message Queue for this topic: " + topic, -1);
-}
-
-long long MQAdminImpl::searchOffset(const MessageQueue& mq, long long timestamp)
-{
- std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
- if (brokerAddr.empty())
- {
- m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
- brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
- }
-
- if (!brokerAddr.empty())
- {
- try
- {
- return m_pMQClientFactory->getMQClientAPIImpl()->searchOffset(brokerAddr, mq.getTopic(),
- mq.getQueueId(), timestamp, 1000 * 3);
- }
- catch (MQClientException e)
- {
- THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1);
- }
- }
- THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
-}
-
-long long MQAdminImpl::maxOffset(const MessageQueue& mq)
-{
- std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
- if (brokerAddr.empty())
- {
- m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
- brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
- }
-
- if (!brokerAddr.empty())
- {
- try
- {
- return m_pMQClientFactory->getMQClientAPIImpl()->getMaxOffset(brokerAddr, mq.getTopic(),
- mq.getQueueId(), 1000 * 3);
- }
- catch (MQClientException e)
- {
- THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1);
- }
- }
- THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
-}
-
-long long MQAdminImpl::minOffset(const MessageQueue& mq)
-{
- std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
- if (brokerAddr.empty())
- {
- m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
- brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
- }
-
- if (!brokerAddr.empty())
- {
- try
- {
- return m_pMQClientFactory->getMQClientAPIImpl()->getMinOffset(brokerAddr, mq.getTopic(),
- mq.getQueueId(), 1000 * 3);
- }
- catch (MQClientException e)
- {
- THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1);
- }
- }
-
- THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
-}
-
-long long MQAdminImpl::earliestMsgStoreTime(const MessageQueue& mq)
-{
- std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
- if (brokerAddr.empty())
- {
- m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
- brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
- }
-
- if (!brokerAddr.empty())
- {
- try
- {
- return m_pMQClientFactory->getMQClientAPIImpl()->getEarliestMsgStoretime(brokerAddr,
- mq.getTopic(), mq.getQueueId(), 1000 * 3);
- }
- catch (MQClientException e)
- {
- THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1);
- }
- }
-
- THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
-}
-
-MessageExt* MQAdminImpl::viewMessage(const std::string& msgId)
-{
- try
- {
- MessageId messageId = MessageDecoder::decodeMessageId(msgId);
- return m_pMQClientFactory->getMQClientAPIImpl()->viewMessage(
- socketAddress2String(messageId.getAddress()), messageId.getOffset(), 1000 * 3);
- }
- catch (UnknownHostException e)
- {
- THROW_MQEXCEPTION(MQClientException, "message id illegal", -1);
- }
-}
-
-QueryResult MQAdminImpl::queryMessage(const std::string& topic,
- const std::string& key,
- int maxNum, long long begin, long long end)
-{
- //TODO
- std::list<MessageExt*> messageList;
- QueryResult result(0, messageList);
-
- return result;
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/b9bb6cfa/rocketmq-client4cpp/src/MQAdminImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQAdminImpl.h b/rocketmq-client4cpp/src/MQAdminImpl.h
deleted file mode 100755
index 907d61e..0000000
--- a/rocketmq-client4cpp/src/MQAdminImpl.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
-* Copyright (C) 2013 kangliqiang ,kangliq@163.com
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-#ifndef __MQADMINIMPL_H__
-#define __MQADMINIMPL_H__
-
-#include <string>
-#include <list>
-#include <set>
-#include <vector>
-
-#include "MessageExt.h"
-#include "QueryResult.h"
-
-namespace rmq
-{
- class MQClientFactory;
- class MessageQueue;
-
- class MQAdminImpl
- {
- public:
- MQAdminImpl(MQClientFactory* pMQClientFactory);
- ~MQAdminImpl();
-
- void createTopic(const std::string& key, const std::string& newTopic, int queueNum);
- void createTopic(const std::string& key, const std::string& newTopic, int queueNum, int topicSysFlag);
-
- std::vector<MessageQueue>* fetchPublishMessageQueues(const std::string& topic);
- std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic);
- long long searchOffset(const MessageQueue& mq, long long timestamp);
- long long maxOffset(const MessageQueue& mq);
- long long minOffset(const MessageQueue& mq);
-
- long long earliestMsgStoreTime(const MessageQueue& mq);
-
- MessageExt* viewMessage(const std::string& msgId);
-
- QueryResult queryMessage(const std::string& topic,
- const std::string& key,
- int maxNum,
- long long begin,
- long long end);
-
- private:
- MQClientFactory* m_pMQClientFactory;
- };
-}
-
-#endif