You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/19 05:55:24 UTC
[pulsar] 05/06: [Python] Return MessageId in producer's synchronous
send method (#9287)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0f846309d5ff615de4ce7a10f656bc7b82b70d8d
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Jan 25 10:42:49 2021 +0800
[Python] Return MessageId in producer's synchronous send method (#9287)
Fixes #9176
### Motivation
Currently Python producer's send method returns nothing. However, it should returns a `MessageId` at least.
### Modifications
- Add a new `Producer#send` API with an extra argument as the output argument of `MessageId`.
- Improve the `Producer#send` related documents to ensure these methods can link to each other in Doxygen-generated websites.
- Let Python client's `Producer#send` return a `MessageId`.
- Add related tests.
It should be noted that the current C++ `Producer::send`'s API design is strange and weird that it returns no `MessageId` explicitly. However it will set the input `Message`'s internal `MessageId` field and there're no document about it, as well as the tests.
This API design is extremely terrible. I think without looking into the source code, no one could guess that the input **const** `Message` argument will **modify** its internal field after `send` is completed. So I add a new `send` method as a substitute and mark the old `send` method deprecated.
(cherry picked from commit 0e6bbc8ef41fc38048cb95ef3b594c070268916b)
---
pulsar-client-cpp/include/pulsar/Producer.h | 23 +++++++++---
.../include/pulsar/ProducerConfiguration.h | 42 ++++++++++++++++++++++
pulsar-client-cpp/lib/Producer.cc | 11 ++++++
pulsar-client-cpp/python/pulsar/__init__.py | 6 ++--
pulsar-client-cpp/python/pulsar_test.py | 12 +++++++
pulsar-client-cpp/python/src/producer.cc | 8 +++--
pulsar-client-cpp/tests/ProducerTest.cc | 28 +++++++++++++--
7 files changed, 120 insertions(+), 10 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/Producer.h b/pulsar-client-cpp/include/pulsar/Producer.h
index ae55093..6241fa2 100644
--- a/pulsar-client-cpp/include/pulsar/Producer.h
+++ b/pulsar-client-cpp/include/pulsar/Producer.h
@@ -50,7 +50,14 @@ class PULSAR_PUBLIC Producer {
const std::string& getProducerName() const;
/**
- * Publish a message on the topic associated with this Producer.
+ * @deprecated
+ * It's the same with send(const Message& msg, MessageId& messageId) except that MessageId will be stored
+ * in `msg` though `msg` is `const`.
+ */
+ Result send(const Message& msg);
+
+ /**
+ * Publish a message on the topic associated with this Producer and get the associated MessageId.
*
* This method will block until the message will be accepted and persisted
* by the broker. In case of errors, the client library will try to
@@ -61,11 +68,19 @@ class PULSAR_PUBLIC Producer {
*
* This method is equivalent to asyncSend() and wait until the callback is triggered.
*
- * @param msg message to publish
+ * @param [in] msg message to publish
+ * @param [out] messageId the message id assigned to the published message
* @return ResultOk if the message was published successfully
- * @return ResultWriteError if it wasn't possible to publish the message
+ * @return ResultTimeout if message was not sent successfully in ProducerConfiguration#getSendTimeout
+ * @return ResultProducerQueueIsFull if the outgoing messsage queue is full when
+ * ProducerConfiguration::getBlockIfQueueFull was false
+ * @return ResultMessageTooBig if message size is bigger than the maximum message size
+ * @return ResultAlreadyClosed if Producer was already closed when message was sent
+ * @return ResultCryptoError if ProducerConfiguration::isEncryptionEnabled returns true but the message
+ * was failed to encrypt
+ * @return ResultInvalidMessage if message's invalid, it's usually caused by resending the same Message
*/
- Result send(const Message& msg);
+ Result send(const Message& msg, MessageId& messageId);
/**
* Asynchronously publish a message on the topic associated with this Producer.
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 5666551..e260ca9 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -107,7 +107,21 @@ class PULSAR_PUBLIC ProducerConfiguration {
*/
const SchemaInfo& getSchema() const;
+ /**
+ * The getter associated with getSendTimeout()
+ */
ProducerConfiguration& setSendTimeout(int sendTimeoutMs);
+
+ /**
+ * Get the send timeout is milliseconds.
+ *
+ * If a message is not acknowledged by the server before the sendTimeout expires, an error will be
+ * reported.
+ *
+ * If the timeout is zero, there will be no timeout.
+ *
+ * @return the send timeout in milliseconds (Default: 30000)
+ */
int getSendTimeout() const;
ProducerConfiguration& setInitialSequenceId(int64_t initialSequenceId);
@@ -159,7 +173,15 @@ class PULSAR_PUBLIC ProducerConfiguration {
ProducerConfiguration& setHashingScheme(const HashingScheme& scheme);
HashingScheme getHashingScheme() const;
+ /**
+ * The setter associated with getBlockIfQueueFull()
+ */
ProducerConfiguration& setBlockIfQueueFull(bool);
+
+ /**
+ * @return whether Producer::send or Producer::sendAsync operations should block when the outgoing message
+ * queue is full. (Default: false)
+ */
bool getBlockIfQueueFull() const;
// Zero queue size feature will not be supported on consumer end if batching is enabled
@@ -188,8 +210,28 @@ class PULSAR_PUBLIC ProducerConfiguration {
ProducerCryptoFailureAction getCryptoFailureAction() const;
ProducerConfiguration& setCryptoFailureAction(ProducerCryptoFailureAction action);
+ /**
+ * @return all the encryption keys added
+ */
const std::set<std::string>& getEncryptionKeys() const;
+
+ /**
+ * @return true if encryption keys are added
+ */
bool isEncryptionEnabled() const;
+
+ /**
+ * Add public encryption key, used by producer to encrypt the data key.
+ *
+ * At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If
+ * keys are found, a callback getKey(String keyName) is invoked against each key to load the values of the
+ * key. Application should implement this callback to return the key in pkcs8 format. If compression is
+ * enabled, message is encrypted after compression. If batch messaging is enabled, the batched message is
+ * encrypted.
+ *
+ * @key the encryption key to add
+ * @return the ProducerConfiguration self
+ */
ProducerConfiguration& addEncryptionKey(std::string key);
/**
diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc
index 7c7e53c..729577c 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -49,6 +49,17 @@ Result Producer::send(const Message& msg) {
return result;
}
+Result Producer::send(const Message& msg, MessageId& messageId) {
+ Promise<Result, MessageId> promise;
+ sendAsync(msg, WaitForCallbackValue<MessageId>(promise));
+
+ if (!promise.isComplete()) {
+ impl_->triggerFlush();
+ }
+
+ return promise.getFuture().get(messageId);
+}
+
void Producer::sendAsync(const Message& msg, SendCallback callback) {
if (!impl_) {
callback(ResultProducerNotInitialized, msg.getMessageId());
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index c3c610a..cfae7e0 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -853,6 +853,8 @@ class Producer:
"""
Publish a message on the topic. Blocks until the message is acknowledged
+ Returns a `MessageId` object that represents where the message is persisted.
+
**Args**
* `content`:
@@ -887,7 +889,7 @@ class Producer:
msg = self._build_msg(content, properties, partition_key, sequence_id,
replication_clusters, disable_replication, event_timestamp,
deliver_at, deliver_after)
- return self._producer.send(msg)
+ return MessageId.deserialize(self._producer.send(msg))
def send_async(self, content, callback,
properties=None,
@@ -1000,7 +1002,7 @@ class Producer:
mb.deliver_at(deliver_at)
if deliver_after:
mb.deliver_after(deliver_after)
-
+
return mb.build()
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 2ea940e..f056832 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -122,6 +122,18 @@ class PulsarTest(TestCase):
self.assertEqual(len(sent_messages), 3)
client.close()
+ def test_producer_send(self):
+ client = Client(self.serviceUrl)
+ topic = 'test_producer_send'
+ producer = client.create_producer(topic)
+ consumer = client.subscribe(topic, 'sub-name')
+ msg_id = producer.send(b'hello')
+ print('send to {}'.format(msg_id))
+ msg = consumer.receive(TM)
+ consumer.acknowledge(msg)
+ print('receive from {}'.format(msg.message_id()))
+ self.assertEqual(msg_id, msg.message_id())
+
def test_producer_consumer(self):
client = Client(self.serviceUrl)
consumer = client.subscribe('my-python-topic-producer-consumer',
diff --git a/pulsar-client-cpp/python/src/producer.cc b/pulsar-client-cpp/python/src/producer.cc
index 3be5417..c50eac1 100644
--- a/pulsar-client-cpp/python/src/producer.cc
+++ b/pulsar-client-cpp/python/src/producer.cc
@@ -20,13 +20,17 @@
#include <functional>
-void Producer_send(Producer& producer, const Message& message) {
+extern boost::python::object MessageId_serialize(const MessageId& msgId);
+
+boost::python::object Producer_send(Producer& producer, const Message& message) {
Result res;
+ MessageId messageId;
Py_BEGIN_ALLOW_THREADS
- res = producer.send(message);
+ res = producer.send(message, messageId);
Py_END_ALLOW_THREADS
CHECK_RESULT(res);
+ return MessageId_serialize(messageId);
}
void Producer_sendAsyncCallback(PyObject* callback, Result res, const MessageId& msgId) {
diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc
index 6769353..38f8d32 100644
--- a/pulsar-client-cpp/tests/ProducerTest.cc
+++ b/pulsar-client-cpp/tests/ProducerTest.cc
@@ -19,8 +19,10 @@
#include <pulsar/Client.h>
#include <gtest/gtest.h>
-#include "../lib/Future.h"
-#include "../lib/Utils.h"
+#include "lib/Future.h"
+#include "lib/Utils.h"
+#include "lib/LogUtils.h"
+DECLARE_LOG_OBJECT()
using namespace pulsar;
@@ -71,3 +73,25 @@ TEST(ProducerTest, exactlyOnceWithProducerNameSpecified) {
Result result = client.createProducer(topicName, producerConfiguration2, producer3);
ASSERT_EQ(ResultProducerBusy, result);
}
+
+TEST(ProducerTest, testSynchronouslySend) {
+ Client client(serviceUrl);
+ const std::string topic = "ProducerTestSynchronouslySend";
+
+ Consumer consumer;
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "sub-name", consumer));
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+ MessageId messageId;
+ ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("hello").build(), messageId));
+ LOG_INFO("Send message to " << messageId);
+
+ Message receivedMessage;
+ ASSERT_EQ(ResultOk, consumer.receive(receivedMessage, 3000));
+ LOG_INFO("Received message from " << receivedMessage.getMessageId());
+ ASSERT_EQ(receivedMessage.getMessageId(), messageId);
+ ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMessage));
+
+ client.close();
+}