You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/01 19:03:58 UTC

[incubator-pulsar] branch master updated: Support Pulsar proxy from C++/Python client library (#1124)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 58d5727  Support Pulsar proxy from C++/Python client library (#1124)
58d5727 is described below

commit 58d572797ebd780fecb1c4f4478fad1d590f14c3
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Feb 1 11:03:56 2018 -0800

    Support Pulsar proxy from C++/Python client library (#1124)
---
 pulsar-client-cpp/lib/BinaryProtoLookupService.cc | 10 ++++++---
 pulsar-client-cpp/lib/ClientConnection.cc         | 25 +++++++++++++++--------
 pulsar-client-cpp/lib/ClientConnection.h          | 15 +++++++++-----
 pulsar-client-cpp/lib/ClientImpl.cc               |  6 +++++-
 pulsar-client-cpp/lib/Commands.cc                 | 10 ++++++++-
 pulsar-client-cpp/lib/Commands.h                  |  3 ++-
 pulsar-client-cpp/lib/ConnectionPool.cc           | 19 +++++++++--------
 pulsar-client-cpp/lib/ConnectionPool.h            | 19 ++++++++++++++++-
 pulsar-client-cpp/lib/HTTPLookupService.cc        |  1 +
 pulsar-client-cpp/lib/LookupDataResult.h          | 15 +++++++++++---
 pulsar-client-cpp/lib/LookupService.h             |  4 ++++
 pulsar-client-cpp/lib/Url.cc                      |  6 ++++++
 pulsar-client-cpp/lib/Url.h                       |  2 ++
 pulsar-client-cpp/lib/lz4/lz4.h                   |  5 +++++
 pulsar-client-cpp/python/test_producer.py         |  4 ++--
 15 files changed, 110 insertions(+), 34 deletions(-)

diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
index 6303449..e5b8d42 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
@@ -53,7 +53,7 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::lookupAsync(
     }
     std::string lookupName = dn->toString();
     LookupDataResultPromisePtr promise = boost::make_shared<LookupDataResultPromise>();
-    Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_);
+    Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
     future.addListener(boost::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, lookupName, false,
                                    _1, _2, promise));
     return promise->getFuture();
@@ -71,7 +71,7 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetada
         return promise->getFuture();
     }
     std::string lookupName = dn->toString();
-    Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_);
+    Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
     future.addListener(boost::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this,
                                    lookupName, _1, _2, promise));
     return promise->getFuture();
@@ -100,8 +100,12 @@ void BinaryProtoLookupService::handleLookup(const std::string& destinationName,
         if (data->isRedirect()) {
             LOG_DEBUG("Lookup request is for " << destinationName << " redirected to "
                                                << data->getBrokerUrl());
+
+            const std::string& logicalAddress = data->getBrokerUrl();
+            const std::string& physicalAddress =
+                data->shouldProxyThroughServiceUrl() ? serviceUrl_ : logicalAddress;
             Future<Result, ClientConnectionWeakPtr> future =
-                cnxPool_.getConnectionAsync(data->getBrokerUrl());
+                cnxPool_.getConnectionAsync(logicalAddress, physicalAddress);
             future.addListener(boost::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this,
                                            destinationName, data->isAuthoritative(), _1, _2, promise));
         } else {
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 7e954cd..8bdd7a2 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -104,7 +104,8 @@ static Result getResult(ServerError serverError) {
     return ResultUnknownError;
 }
 
-ClientConnection::ClientConnection(const std::string& endpoint, ExecutorServicePtr executor,
+ClientConnection::ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress,
+                                   ExecutorServicePtr executor,
                                    const ClientConfiguration& clientConfiguration,
                                    const AuthenticationPtr& authentication)
     : state_(Pending),
@@ -114,8 +115,9 @@ ClientConnection::ClientConnection(const std::string& endpoint, ExecutorServiceP
       executor_(executor),
       resolver_(executor->createTcpResolver()),
       socket_(executor->createSocket()),
-      address_(endpoint),
-      cnxString_("[<none> -> " + endpoint + "] "),
+      logicalAddress_(logicalAddress),
+      physicalAddress_(physicalAddress),
+      cnxString_("[<none> -> " + physicalAddress + "] "),
       error_(boost::system::error_code()),
       incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
       incomingCmd_(),
@@ -267,7 +269,11 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
         cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] ";
         cnxString_ = cnxStringStream.str();
 
-        LOG_INFO(cnxString_ << "Connected to broker");
+        if (logicalAddress_ == physicalAddress_) {
+            LOG_INFO(cnxString_ << "Connected to broker");
+        } else {
+            LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_);
+        }
         state_ = TcpConnected;
         socket_->set_option(tcp::no_delay(true));
 
@@ -288,7 +294,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
             if (!isTlsAllowInsecureConnection_) {
                 boost::system::error_code err;
                 Url service_url;
-                if (!Url::parse(address_, service_url)) {
+                if (!Url::parse(physicalAddress_, service_url)) {
                     LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
                     close();
                     return;
@@ -315,7 +321,8 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
 }
 
 void ClientConnection::handleHandshake(const boost::system::error_code& err) {
-    SharedBuffer buffer = Commands::newConnect(authentication_);
+    bool connectingThroughProxy = logicalAddress_ != physicalAddress_;
+    SharedBuffer buffer = Commands::newConnect(authentication_, logicalAddress_, connectingThroughProxy);
     // Send CONNECT command to broker
     asyncWrite(buffer.const_asio_buffer(),
                boost::bind(&ClientConnection::handleSentPulsarConnect, shared_from_this(),
@@ -343,7 +350,7 @@ void ClientConnection::handleSentPulsarConnect(const boost::system::error_code&
 void ClientConnection::tcpConnectAsync() {
     boost::system::error_code err;
     Url service_url;
-    if (!Url::parse(address_, service_url)) {
+    if (!Url::parse(physicalAddress_, service_url)) {
         LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
         close();
         return;
@@ -788,6 +795,8 @@ void ClientConnection::handleIncomingCommand() {
                             lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative());
                             lookupResultPtr->setRedirect(lookupTopicResponse.response() ==
                                                          CommandLookupTopicResponse::Redirect);
+                            lookupResultPtr->setShouldProxyThroughServiceUrl(
+                                lookupTopicResponse.proxy_through_service_url());
                             lookupDataPromise->setValue(lookupResultPtr);
                         }
 
@@ -1178,7 +1187,7 @@ void ClientConnection::removeConsumer(int consumerId) {
     consumers_.erase(consumerId);
 }
 
-const std::string& ClientConnection::brokerAddress() const { return address_; }
+const std::string& ClientConnection::brokerAddress() const { return physicalAddress_; }
 
 const std::string& ClientConnection::cnxString() const { return cnxString_; }
 
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index 4fb4abd..6b0d884 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -87,12 +87,14 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
     typedef std::vector<ConnectionListener>::iterator ListenerIterator;
 
     /*
-     *  endpoint  -  url of the service, for ex. pulsar://localhost:6650
-     *  connected -  set when tcp connection is established
+     *  logicalAddress -  url of the service, for ex. pulsar://localhost:6650
+     *  physicalAddress - the address to connect to, it could be different from the logical address if proxy
+     * comes into play connected -  set when tcp connection is established
      *
      */
-    ClientConnection(const std::string& endpoint, ExecutorServicePtr executor,
-                     const ClientConfiguration& clientConfiguration, const AuthenticationPtr& authentication);
+    ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress,
+                     ExecutorServicePtr executor, const ClientConfiguration& clientConfiguration,
+                     const AuthenticationPtr& authentication);
     ~ClientConnection();
 
     /*
@@ -226,10 +228,13 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
      */
     SocketPtr socket_;
     TlsSocketPtr tlsSocket_;
+
+    const std::string logicalAddress_;
+
     /*
      *  stores address of the service, for ex. pulsar://localhost:6650
      */
-    const std::string address_;
+    const std::string physicalAddress_;
 
     // Represent both endpoint of the tcp connection. eg: [client:1234 -> server:6650]
     std::string cnxString_;
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index a95cd49..b760135 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -258,7 +258,11 @@ void ClientImpl::handleLookup(Result result, LookupDataResultPtr data,
                               Promise<Result, ClientConnectionWeakPtr> promise) {
     if (data) {
         LOG_DEBUG("Getting connection to broker: " << data->getBrokerUrl());
-        Future<Result, ClientConnectionWeakPtr> future = pool_.getConnectionAsync(data->getBrokerUrl());
+        const std::string& logicalAddress = data->getBrokerUrl();
+        const std::string& physicalAddress =
+            data->shouldProxyThroughServiceUrl() ? serviceUrl_ : logicalAddress;
+        Future<Result, ClientConnectionWeakPtr> future =
+            pool_.getConnectionAsync(logicalAddress, physicalAddress);
         future.addListener(boost::bind(&ClientImpl::handleNewConnection, this, _1, _2, promise));
     } else {
         promise.setFailed(result);
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 59cf2a2..0c3fdbd 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -22,6 +22,7 @@
 #include "pulsar/MessageBuilder.h"
 #include "LogUtils.h"
 #include "Utils.h"
+#include "Url.h"
 #include "checksum/ChecksumProvider.h"
 #include <algorithm>
 #include <boost/thread/mutex.hpp>
@@ -160,13 +161,20 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint
     return composite;
 }
 
-SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication) {
+SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
+                                  bool connectingThroughProxy) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::CONNECT);
     CommandConnect* connect = cmd.mutable_connect();
     connect->set_client_version(_PULSAR_VERSION_);
     connect->set_auth_method_name(authentication->getAuthMethodName());
     connect->set_protocol_version(ProtocolVersion_MAX);
+    if (connectingThroughProxy) {
+        Url logicalAddressUrl;
+        Url::parse(logicalAddress, logicalAddressUrl);
+        connect->set_proxy_to_broker_url(logicalAddressUrl.hostPort());
+    }
+
     AuthenticationDataPtr authDataContent;
     if (authentication->getAuthData(authDataContent) == ResultOk && authDataContent->hasDataFromCommand()) {
         connect->set_auth_data(authDataContent->getCommandData());
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 1fd9e48..bda48d2 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -64,7 +64,8 @@ class Commands {
     const static uint16_t magicCrc32c = 0x0e01;
     const static int checksumSize = 4;
 
-    static SharedBuffer newConnect(const AuthenticationPtr& authentication);
+    static SharedBuffer newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
+                                   bool connectingThroughProxy);
 
     static SharedBuffer newPartitionMetadataRequest(const std::string& topic, uint64_t requestId);
 
diff --git a/pulsar-client-cpp/lib/ConnectionPool.cc b/pulsar-client-cpp/lib/ConnectionPool.cc
index 6ee0e9d..36ce4a5 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.cc
+++ b/pulsar-client-cpp/lib/ConnectionPool.cc
@@ -33,36 +33,37 @@ ConnectionPool::ConnectionPool(const ClientConfiguration& conf, ExecutorServiceP
       poolConnections_(poolConnections),
       mutex_() {}
 
-Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const std::string& endpoint) {
+Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
+    const std::string& logicalAddress, const std::string& physicalAddress) {
     boost::unique_lock<boost::mutex> lock(mutex_);
 
     if (poolConnections_) {
-        PoolMap::iterator cnxIt = pool_.find(endpoint);
+        PoolMap::iterator cnxIt = pool_.find(logicalAddress);
         if (cnxIt != pool_.end()) {
             ClientConnectionPtr cnx = cnxIt->second.lock();
 
             if (cnx && !cnx->isClosed()) {
                 // Found a valid or pending connection in the pool
-                LOG_DEBUG("Got connection from pool for " << endpoint << " use_count: "  //
+                LOG_DEBUG("Got connection from pool for " << logicalAddress << " use_count: "  //
                                                           << (cnx.use_count() - 1) << " @ " << cnx.get());
                 return cnx->getConnectFuture();
             } else {
                 // Deleting stale connection
                 LOG_INFO("Deleting stale connection from pool for "
-                         << endpoint << " use_count: " << (cnx.use_count() - 1) << " @ " << cnx.get());
-                pool_.erase(endpoint);
+                         << logicalAddress << " use_count: " << (cnx.use_count() - 1) << " @ " << cnx.get());
+                pool_.erase(logicalAddress);
             }
         }
     }
 
     // No valid or pending connection found in the pool, creating a new one
-    ClientConnectionPtr cnx(
-        new ClientConnection(endpoint, executorProvider_->get(), clientConfiguration_, authentication_));
+    ClientConnectionPtr cnx(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(),
+                                                 clientConfiguration_, authentication_));
 
-    LOG_INFO("Created connection for " << endpoint);
+    LOG_INFO("Created connection for " << logicalAddress);
 
     Future<Result, ClientConnectionWeakPtr> future = cnx->getConnectFuture();
-    pool_.insert(std::make_pair(endpoint, cnx));
+    pool_.insert(std::make_pair(logicalAddress, cnx));
 
     lock.unlock();
 
diff --git a/pulsar-client-cpp/lib/ConnectionPool.h b/pulsar-client-cpp/lib/ConnectionPool.h
index 8a12890..471454b 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.h
+++ b/pulsar-client-cpp/lib/ConnectionPool.h
@@ -36,7 +36,24 @@ class ConnectionPool {
     ConnectionPool(const ClientConfiguration& conf, ExecutorServiceProviderPtr executorProvider,
                    const AuthenticationPtr& authentication, bool poolConnections = true);
 
-    Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& endpoint);
+    /**
+     * Get a connection from the pool.
+     * <p>
+     * The connection can either be created or be coming from the pool itself.
+     * <p>
+     * When specifying multiple addresses, the logicalAddress is used as a tag for the broker,
+     * while the physicalAddress is where the connection is actually happening.
+     * <p>
+     * These two addresses can be different when the client is forced to connect through
+     * a proxy layer. Essentially, the pool is using the logical address as a way to
+     * decide whether to reuse a particular connection.
+     *
+     * @param logicalAddress the address to use as the broker tag
+     * @param physicalAddress the real address where the TCP connection should be made
+     * @return a future that will produce the ClientCnx object
+     */
+    Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress,
+                                                               const std::string& physicalAddress);
 
    private:
     ClientConfiguration clientConfiguration_;
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc
index 816a7e9..1a98f60 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -217,6 +217,7 @@ LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json)
     LookupDataResultPtr lookupDataResultPtr = boost::make_shared<LookupDataResult>();
     lookupDataResultPtr->setBrokerUrl(brokerUrl);
     lookupDataResultPtr->setBrokerUrlSsl(brokerUrlSsl);
+
     LOG_INFO("parseLookupData = " << *lookupDataResultPtr);
     return lookupDataResultPtr;
 }
diff --git a/pulsar-client-cpp/lib/LookupDataResult.h b/pulsar-client-cpp/lib/LookupDataResult.h
index 698a21f..5c1387a 100644
--- a/pulsar-client-cpp/lib/LookupDataResult.h
+++ b/pulsar-client-cpp/lib/LookupDataResult.h
@@ -32,8 +32,8 @@ class LookupDataResult {
    public:
     void setBrokerUrl(const std::string& brokerUrl) { brokerUrl_ = brokerUrl; }
     void setBrokerUrlSsl(const std::string& brokerUrlSsl) { brokerUrlSsl_ = brokerUrlSsl; }
-    std::string getBrokerUrl() { return brokerUrl_; }
-    std::string getBrokerUrlSsl() { return brokerUrlSsl_; }
+    const std::string& getBrokerUrl() const { return brokerUrl_; }
+    const std::string& getBrokerUrlSsl() const { return brokerUrlSsl_; }
 
     bool isAuthoritative() const { return authoritative; }
 
@@ -47,6 +47,12 @@ class LookupDataResult {
 
     void setRedirect(bool redirect) { this->redirect = redirect; }
 
+    bool shouldProxyThroughServiceUrl() const { return proxyThroughServiceUrl_; }
+
+    void setShouldProxyThroughServiceUrl(bool proxyThroughServiceUrl) {
+        proxyThroughServiceUrl_ = proxyThroughServiceUrl;
+    }
+
    private:
     friend inline std::ostream& operator<<(std::ostream& os, const LookupDataResult& b);
     std::string brokerUrl_;
@@ -54,12 +60,15 @@ class LookupDataResult {
     int partitions;
     bool authoritative;
     bool redirect;
+
+    bool proxyThroughServiceUrl_;
 };
 
 std::ostream& operator<<(std::ostream& os, const LookupDataResult& b) {
     os << "{ LookupDataResult [brokerUrl_ = " << b.brokerUrl_ << "] [brokerUrlSsl_ = " << b.brokerUrlSsl_
        << "] [partitions = " << b.partitions << "] [authoritative = " << b.authoritative
-       << "] [redirect = " << b.redirect << "]";
+       << "] [redirect = " << b.redirect << "] proxyThroughServiceUrl = " << b.proxyThroughServiceUrl_
+       << "] }";
     return os;
 }
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/LookupService.h b/pulsar-client-cpp/lib/LookupService.h
index 4e8dbe6..36ba800 100644
--- a/pulsar-client-cpp/lib/LookupService.h
+++ b/pulsar-client-cpp/lib/LookupService.h
@@ -41,7 +41,11 @@ class LookupService {
      * Gets Partition metadata
      */
     virtual Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const DestinationNamePtr& dn) = 0;
+
+    virtual ~LookupService() {}
 };
+
 typedef boost::shared_ptr<LookupService> LookupServicePtr;
+
 }  // namespace pulsar
 #endif  // PULSAR_CPP_LOOKUPSERVICE_H
diff --git a/pulsar-client-cpp/lib/Url.cc b/pulsar-client-cpp/lib/Url.cc
index af80ea0..0fb22ba 100644
--- a/pulsar-client-cpp/lib/Url.cc
+++ b/pulsar-client-cpp/lib/Url.cc
@@ -88,6 +88,12 @@ const std::string& Url::file() const { return file_; }
 
 const std::string& Url::parameter() const { return parameter_; }
 
+std::string Url::hostPort() const {
+    std::stringstream ss;
+    ss << host_ << ':' << port_;
+    return ss.str();
+}
+
 std::ostream& operator<<(std::ostream& os, const Url& obj) {
     os << "Url [Host = " << obj.host() << ", Protocol = " << obj.protocol() << ", Port = " << obj.port()
        << "]";
diff --git a/pulsar-client-cpp/lib/Url.h b/pulsar-client-cpp/lib/Url.h
index aaf03dc..2b413cf 100644
--- a/pulsar-client-cpp/lib/Url.h
+++ b/pulsar-client-cpp/lib/Url.h
@@ -41,6 +41,8 @@ class Url {
     const std::string& parameter() const;
     friend std::ostream& operator<<(std::ostream& os, const Url& obj);
 
+    std::string hostPort() const;
+
    private:
     std::string protocol_;
     std::string host_;
diff --git a/pulsar-client-cpp/lib/lz4/lz4.h b/pulsar-client-cpp/lib/lz4/lz4.h
index bff8f97..c68232a 100644
--- a/pulsar-client-cpp/lib/lz4/lz4.h
+++ b/pulsar-client-cpp/lib/lz4/lz4.h
@@ -197,9 +197,11 @@ int LZ4_decompress_safe_partial(const char* source, char* dest, int compressedSi
  * note : only allocated directly the structure if you are statically linking LZ4
  *        If you are using liblz4 as a DLL, please use below construction methods instead.
  */
+// clang-format off
 typedef struct {
     long long table[LZ4_STREAMSIZE_U64];
 } LZ4_stream_t;
+// clang-format on
 
 /*
  * LZ4_resetStream
@@ -254,9 +256,12 @@ int LZ4_saveDict(LZ4_stream_t* streamPtr, char* safeBuffer, int dictSize);
 
 #define LZ4_STREAMDECODESIZE_U64 4
 #define LZ4_STREAMDECODESIZE (LZ4_STREAMDECODESIZE_U64 * sizeof(unsigned long long))
+// clang-format off
 typedef struct {
     unsigned long long table[LZ4_STREAMDECODESIZE_U64];
 } LZ4_streamDecode_t;
+// clang-format on
+
 /*
  * LZ4_streamDecode_t
  * information structure to track an LZ4 stream.
diff --git a/pulsar-client-cpp/python/test_producer.py b/pulsar-client-cpp/python/test_producer.py
index 24a2c58..8e3c655 100755
--- a/pulsar-client-cpp/python/test_producer.py
+++ b/pulsar-client-cpp/python/test_producer.py
@@ -30,9 +30,9 @@ producer = client.create_producer(
                     batching_max_publish_delay_ms=10
                 )
 
-while True:
+for i in range(10):
     try:
-        producer.send_async('hello', None)
+        producer.send('hello', None)
     except Exception as e:
         print("Failed to send message: %s", e)
 

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.