You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/04 02:54:18 UTC

[GitHub] [pulsar] equanz opened a new pull request, #17452: [feat][c] Add async receive function to C API

equanz opened a new pull request, #17452:
URL: https://github.com/apache/pulsar/pull/17452

   <!--
   ### Contribution Checklist
     
     - PR title format should be *[type][component] summary*. For details, see *[Guideline - Pulsar PR Naming Convention](https://docs.google.com/document/d/1d8Pw6ZbWk-_pCKdOmdvx9rnhPiyuxwq60_TrD68d7BA/edit#heading=h.trs9rsex3xom)*. 
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   Master Issue: [#14452](https://github.com/apache/pulsar/issues/14452)
   
   ### Motivation
   
   I want to rewrite the pulsar-client-node `receive` method to rely exclusively on async operations rather than worker threads.
   However, an async receive function is not defined currently in C API.
   
   ### Modifications
   
   * Define `pulsar_consumer_receive_async` and `pulsar_receive_callback` to use async receive operation in C API
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is already covered by existing tests, such as *C++ one*.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (yes)
       - https://github.com/apache/pulsar/blob/7a975c82994b365955f30fcd2b6fcc24793f564f/pulsar-client-cpp/include/pulsar/c/consumer.h#L36
       - https://github.com/apache/pulsar/blob/7a975c82994b365955f30fcd2b6fcc24793f564f/pulsar-client-cpp/include/pulsar/c/consumer.h#L100-L108
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [x] `doc-not-needed`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] tisonkun commented on pull request #17452: [feat][c] Add async receive function to C API

Posted by GitBox <gi...@apache.org>.
tisonkun commented on PR #17452:
URL: https://github.com/apache/pulsar/pull/17452#issuecomment-1236249587

   cc @BewareMyPower @shibd @RobertIndie 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] equanz commented on a diff in pull request #17452: [feat][cpp] Add async receive function to C API

Posted by GitBox <gi...@apache.org>.
equanz commented on code in PR #17452:
URL: https://github.com/apache/pulsar/pull/17452#discussion_r964357492


##########
pulsar-client-cpp/lib/c/c_Consumer.cc:
##########
@@ -60,6 +60,20 @@ pulsar_result pulsar_consumer_receive_with_timeout(pulsar_consumer_t *consumer,
     return (pulsar_result)res;
 }
 
+static void handle_receive_callback(pulsar::Result result, pulsar::Message message,
+                                    pulsar_receive_callback callback, void *ctx) {
+    if (callback) {
+        pulsar_message_t *msg = new pulsar_message_t;
+        msg->message = message;
+        callback((pulsar_result)result, msg, ctx);

Review Comment:
   In my understanding, other async functions are also defined by the current approach.
   https://github.com/apache/pulsar/blob/a7f1a5657782ff7ac91a89afdd49568d74d111f5/pulsar-client-cpp/lib/c/c_Producer.cc#L39-L55
   
   I think it is better not to change semantics in the same C API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] equanz commented on a diff in pull request #17452: [feat][cpp] Add async receive function to C API

Posted by GitBox <gi...@apache.org>.
equanz commented on code in PR #17452:
URL: https://github.com/apache/pulsar/pull/17452#discussion_r964356319


##########
pulsar-client-cpp/tests/c/c_BasicEndToEndTest.cc:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <gtest/gtest.h>
+#include <pulsar/c/client.h>
+#include <pulsar/c/producer.h>
+#include <pulsar/c/consumer.h>
+#include <pulsar/c/client_configuration.h>
+#include <pulsar/c/consumer_configuration.h>
+#include <pulsar/c/message.h>
+#include <pulsar/c/message_id.h>
+#include <pulsar/c/result.h>
+
+#include "../lib/Future.h"
+
+TEST(c_BasicEndToEndTest, testAsyncProduceConsume) {
+    const char *lookup_url = "pulsar://localhost:6650";
+    const char *topic_name = "persistent://public/default/test-c-produce-consume";
+    const char *sub_name = "my-sub-name";
+
+    pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+    pulsar_client_t *client = pulsar_client_create(lookup_url, conf);
+
+    pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create();
+    pulsar_producer_t *producer;
+    pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer);
+    ASSERT_EQ(pulsar_result_Ok, result);
+
+    pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create();
+    pulsar_consumer_t *consumer;
+    result = pulsar_client_subscribe(client, topic_name, sub_name, consumer_conf, &consumer);
+    ASSERT_EQ(pulsar_result_Ok, result);
+
+    ASSERT_STREQ(topic_name, pulsar_producer_get_topic(producer));
+    ASSERT_STREQ(topic_name, pulsar_consumer_get_topic(consumer));
+    ASSERT_STREQ(sub_name, pulsar_consumer_get_subscription_name(consumer));
+
+    // send asynchronously
+    pulsar::Promise<pulsar_result, pulsar_message_id_t *> send_promise;
+    const char *content = "msg-1-content";
+    pulsar_message_t *msg = pulsar_message_create();
+    pulsar_message_set_content(msg, content, strlen(content));
+    ASSERT_STREQ("(-1,-1,-1,-1)", pulsar_message_id_str(pulsar_message_get_message_id(msg)));
+    pulsar_producer_send_async(
+        producer, msg,
+        [](pulsar_result async_result, pulsar_message_id_t *msg_id, void *ctx) {
+            auto ctx_promise = static_cast<pulsar::Promise<pulsar_result, pulsar_message_id_t *> *>(ctx);
+            ASSERT_EQ(pulsar_result_Ok, async_result);
+            ctx_promise->setValue(msg_id);
+        },
+        &send_promise);
+
+    pulsar_message_id_t *msg_id;
+    send_promise.getFuture().get(msg_id);
+    ASSERT_STRNE("(-1,-1,-1,-1)", pulsar_message_id_str(msg_id));
+    pulsar_message_id_free(msg_id);
+    pulsar_message_free(msg);
+
+    pulsar::Promise<pulsar_result, pulsar_message_t *> receive_promise;
+    // receive asynchronously
+    pulsar_consumer_receive_async(
+        consumer,
+        [](pulsar_result async_result, pulsar_message_t *received_msg, void *ctx) {
+            auto ctx_promise = static_cast<pulsar::Promise<pulsar_result, pulsar_message_t *> *>(ctx);
+            ASSERT_EQ(pulsar_result_Ok, async_result);
+            ctx_promise->setValue(received_msg);
+        },
+        &receive_promise);
+
+    pulsar_message_t *received_msg;
+    receive_promise.getFuture().get(received_msg);
+    ASSERT_STREQ(content, static_cast<const char *>(pulsar_message_get_data(received_msg)));
+    pulsar_message_free(received_msg);

Review Comment:
   I'll modify it.
   
   However, I also use `pulsar::Promise` to wait for the callback completion in the test thread.
   To achieve this process, I'll use `std::future` instead of conditional wait, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] equanz commented on a diff in pull request #17452: [feat][cpp] Add async receive function to C API

Posted by GitBox <gi...@apache.org>.
equanz commented on code in PR #17452:
URL: https://github.com/apache/pulsar/pull/17452#discussion_r964464127


##########
pulsar-client-cpp/lib/c/c_Consumer.cc:
##########
@@ -60,6 +60,20 @@ pulsar_result pulsar_consumer_receive_with_timeout(pulsar_consumer_t *consumer,
     return (pulsar_result)res;
 }
 
+static void handle_receive_callback(pulsar::Result result, pulsar::Message message,
+                                    pulsar_receive_callback callback, void *ctx) {
+    if (callback) {
+        pulsar_message_t *msg = new pulsar_message_t;
+        msg->message = message;
+        callback((pulsar_result)result, msg, ctx);

Review Comment:
   Okay. I'll add an example about async consume in this PR. Also, I'll create an issue about other async functions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17452: [feat][cpp] Add async receive function to C API

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17452:
URL: https://github.com/apache/pulsar/pull/17452#discussion_r964390078


##########
pulsar-client-cpp/lib/c/c_Consumer.cc:
##########
@@ -60,6 +60,20 @@ pulsar_result pulsar_consumer_receive_with_timeout(pulsar_consumer_t *consumer,
     return (pulsar_result)res;
 }
 
+static void handle_receive_callback(pulsar::Result result, pulsar::Message message,
+                                    pulsar_receive_callback callback, void *ctx) {
+    if (callback) {
+        pulsar_message_t *msg = new pulsar_message_t;
+        msg->message = message;
+        callback((pulsar_result)result, msg, ctx);

Review Comment:
   It makes sense to me. I think we should note it in the docs, or add it to examples.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] equanz commented on pull request #17452: [feat][c] Add async receive function to C API

Posted by GitBox <gi...@apache.org>.
equanz commented on PR #17452:
URL: https://github.com/apache/pulsar/pull/17452#issuecomment-1236474510

   @BewareMyPower 
   > Could you add a basic e2e test for demonstrate it works?
   
   Okay. I'll address it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower merged pull request #17452: [feat][cpp] Add async receive function to C API

Posted by GitBox <gi...@apache.org>.
BewareMyPower merged PR #17452:
URL: https://github.com/apache/pulsar/pull/17452


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] equanz commented on pull request #17452: [feat][c] Add async receive function to C API

Posted by GitBox <gi...@apache.org>.
equanz commented on PR #17452:
URL: https://github.com/apache/pulsar/pull/17452#issuecomment-1236262512

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] equanz commented on pull request #17452: [feat][c] Add async receive function to C API

Posted by GitBox <gi...@apache.org>.
equanz commented on PR #17452:
URL: https://github.com/apache/pulsar/pull/17452#issuecomment-1236286666

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17452: [feat][cpp] Add async receive function to C API

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17452:
URL: https://github.com/apache/pulsar/pull/17452#discussion_r963018620


##########
pulsar-client-cpp/tests/c/c_BasicEndToEndTest.cc:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <gtest/gtest.h>
+#include <pulsar/c/client.h>
+#include <pulsar/c/producer.h>
+#include <pulsar/c/consumer.h>
+#include <pulsar/c/client_configuration.h>
+#include <pulsar/c/consumer_configuration.h>
+#include <pulsar/c/message.h>
+#include <pulsar/c/message_id.h>
+#include <pulsar/c/result.h>
+
+#include "../lib/Future.h"
+
+TEST(c_BasicEndToEndTest, testAsyncProduceConsume) {
+    const char *lookup_url = "pulsar://localhost:6650";
+    const char *topic_name = "persistent://public/default/test-c-produce-consume";
+    const char *sub_name = "my-sub-name";
+
+    pulsar_client_configuration_t *conf = pulsar_client_configuration_create();
+    pulsar_client_t *client = pulsar_client_create(lookup_url, conf);
+
+    pulsar_producer_configuration_t *producer_conf = pulsar_producer_configuration_create();
+    pulsar_producer_t *producer;
+    pulsar_result result = pulsar_client_create_producer(client, topic_name, producer_conf, &producer);
+    ASSERT_EQ(pulsar_result_Ok, result);
+
+    pulsar_consumer_configuration_t *consumer_conf = pulsar_consumer_configuration_create();
+    pulsar_consumer_t *consumer;
+    result = pulsar_client_subscribe(client, topic_name, sub_name, consumer_conf, &consumer);
+    ASSERT_EQ(pulsar_result_Ok, result);
+
+    ASSERT_STREQ(topic_name, pulsar_producer_get_topic(producer));
+    ASSERT_STREQ(topic_name, pulsar_consumer_get_topic(consumer));
+    ASSERT_STREQ(sub_name, pulsar_consumer_get_subscription_name(consumer));
+
+    // send asynchronously
+    pulsar::Promise<pulsar_result, pulsar_message_id_t *> send_promise;
+    const char *content = "msg-1-content";
+    pulsar_message_t *msg = pulsar_message_create();
+    pulsar_message_set_content(msg, content, strlen(content));
+    ASSERT_STREQ("(-1,-1,-1,-1)", pulsar_message_id_str(pulsar_message_get_message_id(msg)));
+    pulsar_producer_send_async(
+        producer, msg,
+        [](pulsar_result async_result, pulsar_message_id_t *msg_id, void *ctx) {
+            auto ctx_promise = static_cast<pulsar::Promise<pulsar_result, pulsar_message_id_t *> *>(ctx);
+            ASSERT_EQ(pulsar_result_Ok, async_result);
+            ctx_promise->setValue(msg_id);
+        },
+        &send_promise);
+
+    pulsar_message_id_t *msg_id;
+    send_promise.getFuture().get(msg_id);
+    ASSERT_STRNE("(-1,-1,-1,-1)", pulsar_message_id_str(msg_id));
+    pulsar_message_id_free(msg_id);
+    pulsar_message_free(msg);
+
+    pulsar::Promise<pulsar_result, pulsar_message_t *> receive_promise;
+    // receive asynchronously
+    pulsar_consumer_receive_async(
+        consumer,
+        [](pulsar_result async_result, pulsar_message_t *received_msg, void *ctx) {
+            auto ctx_promise = static_cast<pulsar::Promise<pulsar_result, pulsar_message_t *> *>(ctx);
+            ASSERT_EQ(pulsar_result_Ok, async_result);
+            ctx_promise->setValue(received_msg);
+        },
+        &receive_promise);
+
+    pulsar_message_t *received_msg;
+    receive_promise.getFuture().get(received_msg);
+    ASSERT_STREQ(content, static_cast<const char *>(pulsar_message_get_data(received_msg)));
+    pulsar_message_free(received_msg);

Review Comment:
   I think we should release the `received_msg` after the `callback` is done. It's better to use pure C semantics here, i.e. use C functions instead of lambda, and do not use `pulsar::Promise`, we should define a C struct and pass it to `ctx`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #17452: [feat][cpp] Add async receive function to C API

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #17452:
URL: https://github.com/apache/pulsar/pull/17452#discussion_r963024003


##########
pulsar-client-cpp/lib/c/c_Consumer.cc:
##########
@@ -60,6 +60,20 @@ pulsar_result pulsar_consumer_receive_with_timeout(pulsar_consumer_t *consumer,
     return (pulsar_result)res;
 }
 
+static void handle_receive_callback(pulsar::Result result, pulsar::Message message,
+                                    pulsar_receive_callback callback, void *ctx) {
+    if (callback) {
+        pulsar_message_t *msg = new pulsar_message_t;
+        msg->message = message;
+        callback((pulsar_result)result, msg, ctx);

Review Comment:
   I think it's better to delete `pulsar_message_t` here and don't leave users to call `pulsar_message_free`. We should encourage users to call functions with `pulsar_message_` prefix in the callback and pass the results to `ctx`.
   
   For a C user, it's more natural to write code like:
   
   ```c++
   struct receive_ctx {
       pulsar_result result;
       const char *value;
       uint32_t length;
   };
   
   static void receive_callback(pulsar_result result, pulsar_message_t *msg, void *ctx) {
       struct receive_ctx *receive_ctx = (struct receive_ctx *)ctx;
       receive_ctx->result = result;
       if (result == pulsar_result_Ok) {
           receive_ctx->value = (const char *)pulsar_message_get_data(msg);
           receive_ctx->length = pulsar_message_get_length(msg);
       } else {
           receive_ctx->value = NULL;
           receive_ctx->length = 0;
       }
   }
   ```
   
   But for the current design, he must save the `msg` in the `ctx` or a global variable, then call `pulsar_message_free` after that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org