You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/12/11 10:31:59 UTC

[GitHub] vongosling closed pull request #27: Change PullConsumer model to CLUSTER & format example code style

vongosling closed pull request #27: Change PullConsumer model to CLUSTER & format example code style
URL: https://github.com/apache/rocketmq-client-cpp/pull/27
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/.gitignore b/.gitignore
index e1f4b40a..5693999b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,6 +1,6 @@
 .idea
 cmake-build-debug/
-bin 
-libs
-
+bin
+build
+libs/signature/lib
 tmp_*
diff --git a/example/Producer.c b/example/Producer.c
index 5888fc8b..cef8383c 100644
--- a/example/Producer.c
+++ b/example/Producer.c
@@ -14,9 +14,7 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-#ifndef WIN32
-#include <unistd.h>
-#endif
+
 #include <stdio.h>
 
 #include "CProducer.h"
@@ -24,35 +22,47 @@
 #include "CMessage.h"
 #include "CSendResult.h"
 
-void startSendMessage(CProducer* producer)
-{
+#ifdef _WIN32
+#include <windows.h>
+#else
+
+#include <unistd.h>
+#include <memory.h>
+
+#endif
+
+void thread_sleep(unsigned milliseconds) {
+#ifdef _WIN32
+    Sleep(milliseconds);
+#else
+    usleep(milliseconds * 1000);  // takes microseconds
+#endif
+}
+
+void startSendMessage(CProducer *producer) {
     int i = 0;
     char DestMsg[256];
-    CMessage* msg = CreateMessage("T_TestTopic");
-    SetMessageTags(msg,"Test_Tag");
-    SetMessageKeys(msg,"Test_Keys");
+    CMessage *msg = CreateMessage("T_TestTopic");
+    SetMessageTags(msg, "Test_Tag");
+    SetMessageKeys(msg, "Test_Keys");
     CSendResult result;
-    for( i=0; i<10; i++)
-    {
-        printf("send one message : %d\n",i);
-	memset(DestMsg,0,sizeof(DestMsg));
-        snprintf(DestMsg,255,"New message body: index %d",i);
-        SetMessageBody(msg,DestMsg);
+    for (i = 0; i < 10; i++) {
+        printf("send one message : %d\n", i);
+        memset(DestMsg, 0, sizeof(DestMsg));
+        snprintf(DestMsg, 255, "New message body: index %d", i);
+        SetMessageBody(msg, DestMsg);
         SendMessageSync(producer, msg, &result);
-        printf("Msg Send ID:%s\n",result.msgId);
-#ifndef WIN32
-        sleep(1);
-#endif
+        printf("Msg Send ID:%s\n", result.msgId);
+        thread_sleep(1000);
     }
 }
 
 
-int main(int argc,char * argv [ ])
-{
+int main(int argc, char *argv[]) {
     printf("Producer Initializing.....\n");
 
-    CProducer* producer = CreateProducer("Group_producer");
-    SetProducerNameServerAddress(producer,"172.17.0.2:9876");
+    CProducer *producer = CreateProducer("Group_producer");
+    SetProducerNameServerAddress(producer, "172.17.0.2:9876");
     StartProducer(producer);
     printf("Producer start.....\n");
     startSendMessage(producer);
diff --git a/example/PullConsumeMessage.c b/example/PullConsumeMessage.c
index 5890cbc5..44e2abdb 100644
--- a/example/PullConsumeMessage.c
+++ b/example/PullConsumeMessage.c
@@ -14,12 +14,8 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-#ifndef WIN32
-#include <unistd.h>
-#endif
-#include <stdio.h>
-
 
+#include <stdio.h>
 
 #include "CPullConsumer.h"
 #include "CCommon.h"
@@ -27,6 +23,22 @@
 #include "CPullResult.h"
 #include "CMessageQueue.h"
 
+#ifdef _WIN32
+#include <windows.h>
+#else
+
+#include <unistd.h>
+#include <memory.h>
+
+#endif
+
+void thread_sleep(unsigned milliseconds) {
+#ifdef _WIN32
+    Sleep(milliseconds);
+#else
+    usleep(milliseconds * 1000);  // takes microseconds
+#endif
+}
 
 int main(int argc,char * argv [])
 {
@@ -39,7 +51,7 @@ int main(int argc,char * argv [])
     for( i=0; i<10; i++)
     {
         printf("Now Running : %d S\n",i*10);
-        sleep(10);
+        thread_sleep(10000);
     }
     ShutdownPullConsumer(consumer);
     DestroyPullConsumer(consumer);
diff --git a/example/PushConsumeMessage.c b/example/PushConsumeMessage.c
index 6ab19066..0e2906d2 100644
--- a/example/PushConsumeMessage.c
+++ b/example/PushConsumeMessage.c
@@ -14,48 +14,52 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
-#ifndef WIN32
-#include <unistd.h>
-#else
-#include <windows.h>
-void sleep(int interval) {
-	Sleep(interval * 10);
-}
-#endif
-#include <stdio.h>
-
 
+#include <stdio.h>
 
 #include "CPushConsumer.h"
 #include "CCommon.h"
 #include "CMessageExt.h"
 
+#ifdef _WIN32
+#include <windows.h>
+#else
+
+#include <unistd.h>
+#include <memory.h>
+
+#endif
+
+void thread_sleep(unsigned milliseconds) {
+#ifdef _WIN32
+    Sleep(milliseconds);
+#else
+    usleep(milliseconds * 1000);  // takes microseconds
+#endif
+}
 
-int doConsumeMessage(struct CPushConsumer * consumer, CMessageExt * msgExt)
-{
+int doConsumeMessage(struct CPushConsumer *consumer, CMessageExt *msgExt) {
     printf("Hello,doConsumeMessage by Application!\n");
-    printf("Msg Topic:%s\n",GetMessageTopic(msgExt));
-    printf("Msg Tags:%s\n",GetMessageTags(msgExt));
-    printf("Msg Keys:%s\n",GetMessageKeys(msgExt));
-    printf("Msg Body:%s\n",GetMessageBody(msgExt));
+    printf("Msg Topic:%s\n", GetMessageTopic(msgExt));
+    printf("Msg Tags:%s\n", GetMessageTags(msgExt));
+    printf("Msg Keys:%s\n", GetMessageKeys(msgExt));
+    printf("Msg Body:%s\n", GetMessageBody(msgExt));
     return E_CONSUME_SUCCESS;
 }
 
 
-int main(int argc,char * argv [])
-{
+int main(int argc, char *argv[]) {
     int i = 0;
     printf("PushConsumer Initializing....\n");
-    CPushConsumer* consumer = CreatePushConsumer("Group_Consumer_Test");
-    SetPushConsumerNameServerAddress(consumer,"172.17.0.2:9876");
-    Subscribe(consumer,"T_TestTopic","*");
-    RegisterMessageCallback(consumer,doConsumeMessage);
+    CPushConsumer *consumer = CreatePushConsumer("Group_Consumer_Test");
+    SetPushConsumerNameServerAddress(consumer, "172.17.0.2:9876");
+    Subscribe(consumer, "T_TestTopic", "*");
+    RegisterMessageCallback(consumer, doConsumeMessage);
     StartPushConsumer(consumer);
     printf("Push Consumer Start...\n");
-    for( i=0; i<10; i++)
-    {
-        printf("Now Running : %d S\n",i*10);
-        sleep(10);
+    for (i = 0; i < 10; i++) {
+        printf("Now Running : %d S\n", i * 10);
+        thread_sleep(10000);
     }
     ShutdownPushConsumer(consumer);
     DestroyPushConsumer(consumer);
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index ff5fbbba..4aa33f65 100755
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -43,7 +43,7 @@ DefaultMQPullConsumer::DefaultMQPullConsumer(const string& groupname)
   string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname;
   setGroupName(gname);
 
-  setMessageModel(BROADCASTING);
+  setMessageModel(CLUSTERING);
 }
 
 DefaultMQPullConsumer::~DefaultMQPullConsumer() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services