You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/19 05:55:24 UTC

[pulsar] 05/06: [Python] Return MessageId in producer's synchronous send method (#9287)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0f846309d5ff615de4ce7a10f656bc7b82b70d8d
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Jan 25 10:42:49 2021 +0800

    [Python] Return MessageId in producer's synchronous send method (#9287)
    
    Fixes #9176
    
    ### Motivation
    
    Currently Python producer's send method returns nothing. However, it should returns a `MessageId` at least.
    
    ### Modifications
    
    - Add a new `Producer#send` API with an extra argument as the output argument of `MessageId`.
    - Improve the `Producer#send` related documents to ensure these methods can link to each other in Doxygen-generated websites.
    - Let Python client's `Producer#send` return a `MessageId`.
    - Add related tests.
    
    It should be noted that the current C++ `Producer::send`'s API design is strange and weird that it returns no `MessageId` explicitly. However it will set the input `Message`'s internal `MessageId` field and there're no document about it, as well as the tests.
    
    This API design is extremely terrible. I think without looking into the source code, no one could guess that the input **const** `Message` argument will **modify** its internal field after `send` is completed. So I add a new `send` method as a substitute and mark the old `send` method deprecated.
    
    (cherry picked from commit 0e6bbc8ef41fc38048cb95ef3b594c070268916b)
---
 pulsar-client-cpp/include/pulsar/Producer.h        | 23 +++++++++---
 .../include/pulsar/ProducerConfiguration.h         | 42 ++++++++++++++++++++++
 pulsar-client-cpp/lib/Producer.cc                  | 11 ++++++
 pulsar-client-cpp/python/pulsar/__init__.py        |  6 ++--
 pulsar-client-cpp/python/pulsar_test.py            | 12 +++++++
 pulsar-client-cpp/python/src/producer.cc           |  8 +++--
 pulsar-client-cpp/tests/ProducerTest.cc            | 28 +++++++++++++--
 7 files changed, 120 insertions(+), 10 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Producer.h b/pulsar-client-cpp/include/pulsar/Producer.h
index ae55093..6241fa2 100644
--- a/pulsar-client-cpp/include/pulsar/Producer.h
+++ b/pulsar-client-cpp/include/pulsar/Producer.h
@@ -50,7 +50,14 @@ class PULSAR_PUBLIC Producer {
     const std::string& getProducerName() const;
 
     /**
-     * Publish a message on the topic associated with this Producer.
+     * @deprecated
+     * It's the same with send(const Message& msg, MessageId& messageId) except that MessageId will be stored
+     * in `msg` though `msg` is `const`.
+     */
+    Result send(const Message& msg);
+
+    /**
+     * Publish a message on the topic associated with this Producer and get the associated MessageId.
      *
      * This method will block until the message will be accepted and persisted
      * by the broker. In case of errors, the client library will try to
@@ -61,11 +68,19 @@ class PULSAR_PUBLIC Producer {
      *
      * This method is equivalent to asyncSend() and wait until the callback is triggered.
      *
-     * @param msg message to publish
+     * @param [in] msg message to publish
+     * @param [out] messageId the message id assigned to the published message
      * @return ResultOk if the message was published successfully
-     * @return ResultWriteError if it wasn't possible to publish the message
+     * @return ResultTimeout if message was not sent successfully in ProducerConfiguration#getSendTimeout
+     * @return ResultProducerQueueIsFull if the outgoing messsage queue is full when
+     *   ProducerConfiguration::getBlockIfQueueFull was false
+     * @return ResultMessageTooBig if message size is bigger than the maximum message size
+     * @return ResultAlreadyClosed if Producer was already closed when message was sent
+     * @return ResultCryptoError if ProducerConfiguration::isEncryptionEnabled returns true but the message
+     *   was failed to encrypt
+     * @return ResultInvalidMessage if message's invalid, it's usually caused by resending the same Message
      */
-    Result send(const Message& msg);
+    Result send(const Message& msg, MessageId& messageId);
 
     /**
      * Asynchronously publish a message on the topic associated with this Producer.
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 5666551..e260ca9 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -107,7 +107,21 @@ class PULSAR_PUBLIC ProducerConfiguration {
      */
     const SchemaInfo& getSchema() const;
 
+    /**
+     * The getter associated with getSendTimeout()
+     */
     ProducerConfiguration& setSendTimeout(int sendTimeoutMs);
+
+    /**
+     * Get the send timeout is milliseconds.
+     *
+     * If a message is not acknowledged by the server before the sendTimeout expires, an error will be
+     * reported.
+     *
+     * If the timeout is zero, there will be no timeout.
+     *
+     * @return the send timeout in milliseconds (Default: 30000)
+     */
     int getSendTimeout() const;
 
     ProducerConfiguration& setInitialSequenceId(int64_t initialSequenceId);
@@ -159,7 +173,15 @@ class PULSAR_PUBLIC ProducerConfiguration {
     ProducerConfiguration& setHashingScheme(const HashingScheme& scheme);
     HashingScheme getHashingScheme() const;
 
+    /**
+     * The setter associated with getBlockIfQueueFull()
+     */
     ProducerConfiguration& setBlockIfQueueFull(bool);
+
+    /**
+     * @return whether Producer::send or Producer::sendAsync operations should block when the outgoing message
+     * queue is full. (Default: false)
+     */
     bool getBlockIfQueueFull() const;
 
     // Zero queue size feature will not be supported on consumer end if batching is enabled
@@ -188,8 +210,28 @@ class PULSAR_PUBLIC ProducerConfiguration {
     ProducerCryptoFailureAction getCryptoFailureAction() const;
     ProducerConfiguration& setCryptoFailureAction(ProducerCryptoFailureAction action);
 
+    /**
+     * @return all the encryption keys added
+     */
     const std::set<std::string>& getEncryptionKeys() const;
+
+    /**
+     * @return true if encryption keys are added
+     */
     bool isEncryptionEnabled() const;
+
+    /**
+     * Add public encryption key, used by producer to encrypt the data key.
+     *
+     * At the time of producer creation, Pulsar client checks if there are keys added to encryptionKeys. If
+     * keys are found, a callback getKey(String keyName) is invoked against each key to load the values of the
+     * key. Application should implement this callback to return the key in pkcs8 format. If compression is
+     * enabled, message is encrypted after compression. If batch messaging is enabled, the batched message is
+     * encrypted.
+     *
+     * @key the encryption key to add
+     * @return the ProducerConfiguration self
+     */
     ProducerConfiguration& addEncryptionKey(std::string key);
 
     /**
diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc
index 7c7e53c..729577c 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -49,6 +49,17 @@ Result Producer::send(const Message& msg) {
     return result;
 }
 
+Result Producer::send(const Message& msg, MessageId& messageId) {
+    Promise<Result, MessageId> promise;
+    sendAsync(msg, WaitForCallbackValue<MessageId>(promise));
+
+    if (!promise.isComplete()) {
+        impl_->triggerFlush();
+    }
+
+    return promise.getFuture().get(messageId);
+}
+
 void Producer::sendAsync(const Message& msg, SendCallback callback) {
     if (!impl_) {
         callback(ResultProducerNotInitialized, msg.getMessageId());
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index c3c610a..cfae7e0 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -853,6 +853,8 @@ class Producer:
         """
         Publish a message on the topic. Blocks until the message is acknowledged
 
+        Returns a `MessageId` object that represents where the message is persisted.
+
         **Args**
 
         * `content`:
@@ -887,7 +889,7 @@ class Producer:
         msg = self._build_msg(content, properties, partition_key, sequence_id,
                               replication_clusters, disable_replication, event_timestamp,
                               deliver_at, deliver_after)
-        return self._producer.send(msg)
+        return MessageId.deserialize(self._producer.send(msg))
 
     def send_async(self, content, callback,
                    properties=None,
@@ -1000,7 +1002,7 @@ class Producer:
             mb.deliver_at(deliver_at)
         if deliver_after:
             mb.deliver_after(deliver_after)
-        
+
         return mb.build()
 
 
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index 2ea940e..f056832 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -122,6 +122,18 @@ class PulsarTest(TestCase):
         self.assertEqual(len(sent_messages), 3)
         client.close()
 
+    def test_producer_send(self):
+        client = Client(self.serviceUrl)
+        topic = 'test_producer_send'
+        producer = client.create_producer(topic)
+        consumer = client.subscribe(topic, 'sub-name')
+        msg_id = producer.send(b'hello')
+        print('send to {}'.format(msg_id))
+        msg = consumer.receive(TM)
+        consumer.acknowledge(msg)
+        print('receive from {}'.format(msg.message_id()))
+        self.assertEqual(msg_id, msg.message_id())
+
     def test_producer_consumer(self):
         client = Client(self.serviceUrl)
         consumer = client.subscribe('my-python-topic-producer-consumer',
diff --git a/pulsar-client-cpp/python/src/producer.cc b/pulsar-client-cpp/python/src/producer.cc
index 3be5417..c50eac1 100644
--- a/pulsar-client-cpp/python/src/producer.cc
+++ b/pulsar-client-cpp/python/src/producer.cc
@@ -20,13 +20,17 @@
 
 #include <functional>
 
-void Producer_send(Producer& producer, const Message& message) {
+extern boost::python::object MessageId_serialize(const MessageId& msgId);
+
+boost::python::object Producer_send(Producer& producer, const Message& message) {
     Result res;
+    MessageId messageId;
     Py_BEGIN_ALLOW_THREADS
-    res = producer.send(message);
+    res = producer.send(message, messageId);
     Py_END_ALLOW_THREADS
 
     CHECK_RESULT(res);
+    return MessageId_serialize(messageId);
 }
 
 void Producer_sendAsyncCallback(PyObject* callback, Result res, const MessageId& msgId) {
diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc
index 6769353..38f8d32 100644
--- a/pulsar-client-cpp/tests/ProducerTest.cc
+++ b/pulsar-client-cpp/tests/ProducerTest.cc
@@ -19,8 +19,10 @@
 #include <pulsar/Client.h>
 #include <gtest/gtest.h>
 
-#include "../lib/Future.h"
-#include "../lib/Utils.h"
+#include "lib/Future.h"
+#include "lib/Utils.h"
+#include "lib/LogUtils.h"
+DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
 
@@ -71,3 +73,25 @@ TEST(ProducerTest, exactlyOnceWithProducerNameSpecified) {
     Result result = client.createProducer(topicName, producerConfiguration2, producer3);
     ASSERT_EQ(ResultProducerBusy, result);
 }
+
+TEST(ProducerTest, testSynchronouslySend) {
+    Client client(serviceUrl);
+    const std::string topic = "ProducerTestSynchronouslySend";
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub-name", consumer));
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    MessageId messageId;
+    ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("hello").build(), messageId));
+    LOG_INFO("Send message to " << messageId);
+
+    Message receivedMessage;
+    ASSERT_EQ(ResultOk, consumer.receive(receivedMessage, 3000));
+    LOG_INFO("Received message from " << receivedMessage.getMessageId());
+    ASSERT_EQ(receivedMessage.getMessageId(), messageId);
+    ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMessage));
+
+    client.close();
+}