You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/08/09 03:19:53 UTC
[pulsar] branch master updated: [client][python] getLastMessageIdAsync C binding (#16255)
This is an automated email from the ASF dual-hosted git repository.
mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fd8ebaa522b [client][python] getLastMessageIdAsync C binding (#16255)
fd8ebaa522b is described below
commit fd8ebaa522bb8e7525be249e34913375c9c6f236
Author: komalatammal <10...@users.noreply.github.com>
AuthorDate: Mon Aug 8 23:19:42 2022 -0400
[client][python] getLastMessageIdAsync C binding (#16255)
* python cc binding for getLastMessageId
* add python Consumer class method and doc
* fix linter issues based on clang-format
* ubuntu linter fix
* try run unit test in ci
* fix doc comment
* test the test case can be ran
### Motivation
Python function getLastMessageId
It is a C binding for https://github.com/apache/pulsar/pull/16182 to implement get_last_message_id() in Python client.
### Modifications
Add Python/C binding code for get_last_message_id()
### Verifying this change
It compiles.
- [x] Make sure that the change passes the CI checks.
This change is a trivial rework / code cleanup without any test coverage.
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (yes)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
- The rest endpoints: (no)
- The admin cli options: (no)
- Anything that affects deployment: (no)
### Documentation
Check the box below or label this PR directly.
Need to update docs?
- [ ] `doc-required`
(Your PR needs to update docs and you will update later)
- [ ] `doc-not-needed`
- [x] `doc`
Python Doc is updated in __init__.py
- [ ] `doc-complete`
(Docs have been already added)
---
pulsar-client-cpp/README.md | 2 +-
pulsar-client-cpp/include/pulsar/c/consumer.h | 3 +++
pulsar-client-cpp/lib/c/c_Consumer.cc | 5 +++++
pulsar-client-cpp/python/pulsar/__init__.py | 7 ++++++-
pulsar-client-cpp/python/pulsar_test.py | 12 ++++++++++++
pulsar-client-cpp/python/src/consumer.cc | 13 ++++++++++++-
6 files changed, 39 insertions(+), 3 deletions(-)
diff --git a/pulsar-client-cpp/README.md b/pulsar-client-cpp/README.md
index 3dfa169c923..155e6c6a907 100644
--- a/pulsar-client-cpp/README.md
+++ b/pulsar-client-cpp/README.md
@@ -281,7 +281,7 @@ ${PULSAR_PATH}/pulsar-test-service-stop.sh
## Requirements for Contributors
-It's recommended to install [LLVM](https://llvm.org/builds/) for `clang-tidy` and `clang-format`. Pulsar C++ client use `clang-format` 6.0+ to format files.
+It's required to install [LLVM](https://llvm.org/builds/) for `clang-tidy` and `clang-format`. Pulsar C++ client use `clang-format` 6.0+ to format files. `make format` automatically formats the files.
Use `pulsar-client-cpp/docker-format.sh` to ensure the C++ sources are correctly formatted.
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer.h b/pulsar-client-cpp/include/pulsar/c/consumer.h
index 03f80f32394..37fd2acf5b8 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer.h
@@ -236,6 +236,9 @@ PULSAR_PUBLIC pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, pu
PULSAR_PUBLIC int pulsar_consumer_is_connected(pulsar_consumer_t *consumer);
+PULSAR_PUBLIC pulsar_result pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer,
+ pulsar_message_id_t *messageId);
+
#ifdef __cplusplus
}
#endif
diff --git a/pulsar-client-cpp/lib/c/c_Consumer.cc b/pulsar-client-cpp/lib/c/c_Consumer.cc
index 9917e8cfad6..00d8311f132 100644
--- a/pulsar-client-cpp/lib/c/c_Consumer.cc
+++ b/pulsar-client-cpp/lib/c/c_Consumer.cc
@@ -143,3 +143,8 @@ pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, pulsar_message_i
}
int pulsar_consumer_is_connected(pulsar_consumer_t *consumer) { return consumer->consumer.isConnected(); }
+
+pulsar_result pulsar_consumer_get_last_message_id(pulsar_consumer_t *consumer,
+ pulsar_message_id_t *messageId) {
+ return (pulsar_result)consumer->consumer.getLastMessageId(messageId->messageId);
+}
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index e79955b57dd..3832c3e69e2 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -1253,7 +1253,12 @@ class Consumer:
Check if the consumer is connected or not.
"""
return self._consumer.is_connected()
-
+
+ def get_last_message_id(self):
+ """
+ Get the last message id.
+ """
+ return self._consumer.get_last_message_id()
class Reader:
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index dbdd6be59c7..127ecc4247c 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -753,6 +753,18 @@ class PulsarTest(TestCase):
self._check_value_error(lambda: client.create_reader(topic, MessageId.earliest, reader_name=5))
client.close()
+ def test_get_last_message_id(self):
+ client = Client(self.serviceUrl)
+ consumer = client.subscribe(
+ "persistent://public/default/topic_name_test", "topic_name_test_sub", consumer_type=ConsumerType.Shared
+ )
+ producer = client.create_producer("persistent://public/default/topic_name_test")
+ msg_id = producer.send(b"hello")
+
+ msg = consumer.receive(TM)
+ self.assertEqual(msg.message_id(), msg_id)
+ client.close()
+
def test_publish_compact_and_consume(self):
client = Client(self.serviceUrl)
topic = "compaction_%s" % (uuid.uuid4())
diff --git a/pulsar-client-cpp/python/src/consumer.cc b/pulsar-client-cpp/python/src/consumer.cc
index 10ffd07496f..811ceb3ddf5 100644
--- a/pulsar-client-cpp/python/src/consumer.cc
+++ b/pulsar-client-cpp/python/src/consumer.cc
@@ -83,6 +83,16 @@ void Consumer_seek_timestamp(Consumer& consumer, uint64_t timestamp) {
bool Consumer_is_connected(Consumer& consumer) { return consumer.isConnected(); }
+MessageId Consumer_get_last_message_id(Consumer& consumer) {
+ MessageId msgId;
+ Result res;
+ Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId);
+ Py_END_ALLOW_THREADS
+
+ CHECK_RESULT(res);
+ return msgId;
+}
+
void export_consumer() {
using namespace boost::python;
@@ -105,5 +115,6 @@ void export_consumer() {
.def("redeliver_unacknowledged_messages", &Consumer::redeliverUnacknowledgedMessages)
.def("seek", &Consumer_seek)
.def("seek", &Consumer_seek_timestamp)
- .def("is_connected", &Consumer_is_connected);
+ .def("is_connected", &Consumer_is_connected)
+ .def("get_last_message_id", &Consumer_get_last_message_id);
}