You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/01 19:03:58 UTC
[incubator-pulsar] branch master updated: Support Pulsar proxy from
C++/Python client library (#1124)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 58d5727 Support Pulsar proxy from C++/Python client library (#1124)
58d5727 is described below
commit 58d572797ebd780fecb1c4f4478fad1d590f14c3
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Feb 1 11:03:56 2018 -0800
Support Pulsar proxy from C++/Python client library (#1124)
---
pulsar-client-cpp/lib/BinaryProtoLookupService.cc | 10 ++++++---
pulsar-client-cpp/lib/ClientConnection.cc | 25 +++++++++++++++--------
pulsar-client-cpp/lib/ClientConnection.h | 15 +++++++++-----
pulsar-client-cpp/lib/ClientImpl.cc | 6 +++++-
pulsar-client-cpp/lib/Commands.cc | 10 ++++++++-
pulsar-client-cpp/lib/Commands.h | 3 ++-
pulsar-client-cpp/lib/ConnectionPool.cc | 19 +++++++++--------
pulsar-client-cpp/lib/ConnectionPool.h | 19 ++++++++++++++++-
pulsar-client-cpp/lib/HTTPLookupService.cc | 1 +
pulsar-client-cpp/lib/LookupDataResult.h | 15 +++++++++++---
pulsar-client-cpp/lib/LookupService.h | 4 ++++
pulsar-client-cpp/lib/Url.cc | 6 ++++++
pulsar-client-cpp/lib/Url.h | 2 ++
pulsar-client-cpp/lib/lz4/lz4.h | 5 +++++
pulsar-client-cpp/python/test_producer.py | 4 ++--
15 files changed, 110 insertions(+), 34 deletions(-)
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
index 6303449..e5b8d42 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
@@ -53,7 +53,7 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::lookupAsync(
}
std::string lookupName = dn->toString();
LookupDataResultPromisePtr promise = boost::make_shared<LookupDataResultPromise>();
- Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_);
+ Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
future.addListener(boost::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, lookupName, false,
_1, _2, promise));
return promise->getFuture();
@@ -71,7 +71,7 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetada
return promise->getFuture();
}
std::string lookupName = dn->toString();
- Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_);
+ Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
future.addListener(boost::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this,
lookupName, _1, _2, promise));
return promise->getFuture();
@@ -100,8 +100,12 @@ void BinaryProtoLookupService::handleLookup(const std::string& destinationName,
if (data->isRedirect()) {
LOG_DEBUG("Lookup request is for " << destinationName << " redirected to "
<< data->getBrokerUrl());
+
+ const std::string& logicalAddress = data->getBrokerUrl();
+ const std::string& physicalAddress =
+ data->shouldProxyThroughServiceUrl() ? serviceUrl_ : logicalAddress;
Future<Result, ClientConnectionWeakPtr> future =
- cnxPool_.getConnectionAsync(data->getBrokerUrl());
+ cnxPool_.getConnectionAsync(logicalAddress, physicalAddress);
future.addListener(boost::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this,
destinationName, data->isAuthoritative(), _1, _2, promise));
} else {
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 7e954cd..8bdd7a2 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -104,7 +104,8 @@ static Result getResult(ServerError serverError) {
return ResultUnknownError;
}
-ClientConnection::ClientConnection(const std::string& endpoint, ExecutorServicePtr executor,
+ClientConnection::ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress,
+ ExecutorServicePtr executor,
const ClientConfiguration& clientConfiguration,
const AuthenticationPtr& authentication)
: state_(Pending),
@@ -114,8 +115,9 @@ ClientConnection::ClientConnection(const std::string& endpoint, ExecutorServiceP
executor_(executor),
resolver_(executor->createTcpResolver()),
socket_(executor->createSocket()),
- address_(endpoint),
- cnxString_("[<none> -> " + endpoint + "] "),
+ logicalAddress_(logicalAddress),
+ physicalAddress_(physicalAddress),
+ cnxString_("[<none> -> " + physicalAddress + "] "),
error_(boost::system::error_code()),
incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
incomingCmd_(),
@@ -267,7 +269,11 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] ";
cnxString_ = cnxStringStream.str();
- LOG_INFO(cnxString_ << "Connected to broker");
+ if (logicalAddress_ == physicalAddress_) {
+ LOG_INFO(cnxString_ << "Connected to broker");
+ } else {
+ LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_);
+ }
state_ = TcpConnected;
socket_->set_option(tcp::no_delay(true));
@@ -288,7 +294,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
if (!isTlsAllowInsecureConnection_) {
boost::system::error_code err;
Url service_url;
- if (!Url::parse(address_, service_url)) {
+ if (!Url::parse(physicalAddress_, service_url)) {
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
close();
return;
@@ -315,7 +321,8 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
}
void ClientConnection::handleHandshake(const boost::system::error_code& err) {
- SharedBuffer buffer = Commands::newConnect(authentication_);
+ bool connectingThroughProxy = logicalAddress_ != physicalAddress_;
+ SharedBuffer buffer = Commands::newConnect(authentication_, logicalAddress_, connectingThroughProxy);
// Send CONNECT command to broker
asyncWrite(buffer.const_asio_buffer(),
boost::bind(&ClientConnection::handleSentPulsarConnect, shared_from_this(),
@@ -343,7 +350,7 @@ void ClientConnection::handleSentPulsarConnect(const boost::system::error_code&
void ClientConnection::tcpConnectAsync() {
boost::system::error_code err;
Url service_url;
- if (!Url::parse(address_, service_url)) {
+ if (!Url::parse(physicalAddress_, service_url)) {
LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
close();
return;
@@ -788,6 +795,8 @@ void ClientConnection::handleIncomingCommand() {
lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative());
lookupResultPtr->setRedirect(lookupTopicResponse.response() ==
CommandLookupTopicResponse::Redirect);
+ lookupResultPtr->setShouldProxyThroughServiceUrl(
+ lookupTopicResponse.proxy_through_service_url());
lookupDataPromise->setValue(lookupResultPtr);
}
@@ -1178,7 +1187,7 @@ void ClientConnection::removeConsumer(int consumerId) {
consumers_.erase(consumerId);
}
-const std::string& ClientConnection::brokerAddress() const { return address_; }
+const std::string& ClientConnection::brokerAddress() const { return physicalAddress_; }
const std::string& ClientConnection::cnxString() const { return cnxString_; }
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index 4fb4abd..6b0d884 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -87,12 +87,14 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
typedef std::vector<ConnectionListener>::iterator ListenerIterator;
/*
- * endpoint - url of the service, for ex. pulsar://localhost:6650
- * connected - set when tcp connection is established
+ * logicalAddress - url of the service, for ex. pulsar://localhost:6650
+ * physicalAddress - the address to connect to, it could be different from the logical address if proxy
+ * comes into play connected - set when tcp connection is established
*
*/
- ClientConnection(const std::string& endpoint, ExecutorServicePtr executor,
- const ClientConfiguration& clientConfiguration, const AuthenticationPtr& authentication);
+ ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress,
+ ExecutorServicePtr executor, const ClientConfiguration& clientConfiguration,
+ const AuthenticationPtr& authentication);
~ClientConnection();
/*
@@ -226,10 +228,13 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
*/
SocketPtr socket_;
TlsSocketPtr tlsSocket_;
+
+ const std::string logicalAddress_;
+
/*
* stores address of the service, for ex. pulsar://localhost:6650
*/
- const std::string address_;
+ const std::string physicalAddress_;
// Represent both endpoint of the tcp connection. eg: [client:1234 -> server:6650]
std::string cnxString_;
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index a95cd49..b760135 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -258,7 +258,11 @@ void ClientImpl::handleLookup(Result result, LookupDataResultPtr data,
Promise<Result, ClientConnectionWeakPtr> promise) {
if (data) {
LOG_DEBUG("Getting connection to broker: " << data->getBrokerUrl());
- Future<Result, ClientConnectionWeakPtr> future = pool_.getConnectionAsync(data->getBrokerUrl());
+ const std::string& logicalAddress = data->getBrokerUrl();
+ const std::string& physicalAddress =
+ data->shouldProxyThroughServiceUrl() ? serviceUrl_ : logicalAddress;
+ Future<Result, ClientConnectionWeakPtr> future =
+ pool_.getConnectionAsync(logicalAddress, physicalAddress);
future.addListener(boost::bind(&ClientImpl::handleNewConnection, this, _1, _2, promise));
} else {
promise.setFailed(result);
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 59cf2a2..0c3fdbd 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -22,6 +22,7 @@
#include "pulsar/MessageBuilder.h"
#include "LogUtils.h"
#include "Utils.h"
+#include "Url.h"
#include "checksum/ChecksumProvider.h"
#include <algorithm>
#include <boost/thread/mutex.hpp>
@@ -160,13 +161,20 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint
return composite;
}
-SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication) {
+SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
+ bool connectingThroughProxy) {
BaseCommand cmd;
cmd.set_type(BaseCommand::CONNECT);
CommandConnect* connect = cmd.mutable_connect();
connect->set_client_version(_PULSAR_VERSION_);
connect->set_auth_method_name(authentication->getAuthMethodName());
connect->set_protocol_version(ProtocolVersion_MAX);
+ if (connectingThroughProxy) {
+ Url logicalAddressUrl;
+ Url::parse(logicalAddress, logicalAddressUrl);
+ connect->set_proxy_to_broker_url(logicalAddressUrl.hostPort());
+ }
+
AuthenticationDataPtr authDataContent;
if (authentication->getAuthData(authDataContent) == ResultOk && authDataContent->hasDataFromCommand()) {
connect->set_auth_data(authDataContent->getCommandData());
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 1fd9e48..bda48d2 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -64,7 +64,8 @@ class Commands {
const static uint16_t magicCrc32c = 0x0e01;
const static int checksumSize = 4;
- static SharedBuffer newConnect(const AuthenticationPtr& authentication);
+ static SharedBuffer newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress,
+ bool connectingThroughProxy);
static SharedBuffer newPartitionMetadataRequest(const std::string& topic, uint64_t requestId);
diff --git a/pulsar-client-cpp/lib/ConnectionPool.cc b/pulsar-client-cpp/lib/ConnectionPool.cc
index 6ee0e9d..36ce4a5 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.cc
+++ b/pulsar-client-cpp/lib/ConnectionPool.cc
@@ -33,36 +33,37 @@ ConnectionPool::ConnectionPool(const ClientConfiguration& conf, ExecutorServiceP
poolConnections_(poolConnections),
mutex_() {}
-Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const std::string& endpoint) {
+Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
+ const std::string& logicalAddress, const std::string& physicalAddress) {
boost::unique_lock<boost::mutex> lock(mutex_);
if (poolConnections_) {
- PoolMap::iterator cnxIt = pool_.find(endpoint);
+ PoolMap::iterator cnxIt = pool_.find(logicalAddress);
if (cnxIt != pool_.end()) {
ClientConnectionPtr cnx = cnxIt->second.lock();
if (cnx && !cnx->isClosed()) {
// Found a valid or pending connection in the pool
- LOG_DEBUG("Got connection from pool for " << endpoint << " use_count: " //
+ LOG_DEBUG("Got connection from pool for " << logicalAddress << " use_count: " //
<< (cnx.use_count() - 1) << " @ " << cnx.get());
return cnx->getConnectFuture();
} else {
// Deleting stale connection
LOG_INFO("Deleting stale connection from pool for "
- << endpoint << " use_count: " << (cnx.use_count() - 1) << " @ " << cnx.get());
- pool_.erase(endpoint);
+ << logicalAddress << " use_count: " << (cnx.use_count() - 1) << " @ " << cnx.get());
+ pool_.erase(logicalAddress);
}
}
}
// No valid or pending connection found in the pool, creating a new one
- ClientConnectionPtr cnx(
- new ClientConnection(endpoint, executorProvider_->get(), clientConfiguration_, authentication_));
+ ClientConnectionPtr cnx(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(),
+ clientConfiguration_, authentication_));
- LOG_INFO("Created connection for " << endpoint);
+ LOG_INFO("Created connection for " << logicalAddress);
Future<Result, ClientConnectionWeakPtr> future = cnx->getConnectFuture();
- pool_.insert(std::make_pair(endpoint, cnx));
+ pool_.insert(std::make_pair(logicalAddress, cnx));
lock.unlock();
diff --git a/pulsar-client-cpp/lib/ConnectionPool.h b/pulsar-client-cpp/lib/ConnectionPool.h
index 8a12890..471454b 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.h
+++ b/pulsar-client-cpp/lib/ConnectionPool.h
@@ -36,7 +36,24 @@ class ConnectionPool {
ConnectionPool(const ClientConfiguration& conf, ExecutorServiceProviderPtr executorProvider,
const AuthenticationPtr& authentication, bool poolConnections = true);
- Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& endpoint);
+ /**
+ * Get a connection from the pool.
+ * <p>
+ * The connection can either be created or be coming from the pool itself.
+ * <p>
+ * When specifying multiple addresses, the logicalAddress is used as a tag for the broker,
+ * while the physicalAddress is where the connection is actually happening.
+ * <p>
+ * These two addresses can be different when the client is forced to connect through
+ * a proxy layer. Essentially, the pool is using the logical address as a way to
+ * decide whether to reuse a particular connection.
+ *
+ * @param logicalAddress the address to use as the broker tag
+ * @param physicalAddress the real address where the TCP connection should be made
+ * @return a future that will produce the ClientCnx object
+ */
+ Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress,
+ const std::string& physicalAddress);
private:
ClientConfiguration clientConfiguration_;
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc
index 816a7e9..1a98f60 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -217,6 +217,7 @@ LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json)
LookupDataResultPtr lookupDataResultPtr = boost::make_shared<LookupDataResult>();
lookupDataResultPtr->setBrokerUrl(brokerUrl);
lookupDataResultPtr->setBrokerUrlSsl(brokerUrlSsl);
+
LOG_INFO("parseLookupData = " << *lookupDataResultPtr);
return lookupDataResultPtr;
}
diff --git a/pulsar-client-cpp/lib/LookupDataResult.h b/pulsar-client-cpp/lib/LookupDataResult.h
index 698a21f..5c1387a 100644
--- a/pulsar-client-cpp/lib/LookupDataResult.h
+++ b/pulsar-client-cpp/lib/LookupDataResult.h
@@ -32,8 +32,8 @@ class LookupDataResult {
public:
void setBrokerUrl(const std::string& brokerUrl) { brokerUrl_ = brokerUrl; }
void setBrokerUrlSsl(const std::string& brokerUrlSsl) { brokerUrlSsl_ = brokerUrlSsl; }
- std::string getBrokerUrl() { return brokerUrl_; }
- std::string getBrokerUrlSsl() { return brokerUrlSsl_; }
+ const std::string& getBrokerUrl() const { return brokerUrl_; }
+ const std::string& getBrokerUrlSsl() const { return brokerUrlSsl_; }
bool isAuthoritative() const { return authoritative; }
@@ -47,6 +47,12 @@ class LookupDataResult {
void setRedirect(bool redirect) { this->redirect = redirect; }
+ bool shouldProxyThroughServiceUrl() const { return proxyThroughServiceUrl_; }
+
+ void setShouldProxyThroughServiceUrl(bool proxyThroughServiceUrl) {
+ proxyThroughServiceUrl_ = proxyThroughServiceUrl;
+ }
+
private:
friend inline std::ostream& operator<<(std::ostream& os, const LookupDataResult& b);
std::string brokerUrl_;
@@ -54,12 +60,15 @@ class LookupDataResult {
int partitions;
bool authoritative;
bool redirect;
+
+ bool proxyThroughServiceUrl_;
};
std::ostream& operator<<(std::ostream& os, const LookupDataResult& b) {
os << "{ LookupDataResult [brokerUrl_ = " << b.brokerUrl_ << "] [brokerUrlSsl_ = " << b.brokerUrlSsl_
<< "] [partitions = " << b.partitions << "] [authoritative = " << b.authoritative
- << "] [redirect = " << b.redirect << "]";
+ << "] [redirect = " << b.redirect << "] proxyThroughServiceUrl = " << b.proxyThroughServiceUrl_
+ << "] }";
return os;
}
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/LookupService.h b/pulsar-client-cpp/lib/LookupService.h
index 4e8dbe6..36ba800 100644
--- a/pulsar-client-cpp/lib/LookupService.h
+++ b/pulsar-client-cpp/lib/LookupService.h
@@ -41,7 +41,11 @@ class LookupService {
* Gets Partition metadata
*/
virtual Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const DestinationNamePtr& dn) = 0;
+
+ virtual ~LookupService() {}
};
+
typedef boost::shared_ptr<LookupService> LookupServicePtr;
+
} // namespace pulsar
#endif // PULSAR_CPP_LOOKUPSERVICE_H
diff --git a/pulsar-client-cpp/lib/Url.cc b/pulsar-client-cpp/lib/Url.cc
index af80ea0..0fb22ba 100644
--- a/pulsar-client-cpp/lib/Url.cc
+++ b/pulsar-client-cpp/lib/Url.cc
@@ -88,6 +88,12 @@ const std::string& Url::file() const { return file_; }
const std::string& Url::parameter() const { return parameter_; }
+std::string Url::hostPort() const {
+ std::stringstream ss;
+ ss << host_ << ':' << port_;
+ return ss.str();
+}
+
std::ostream& operator<<(std::ostream& os, const Url& obj) {
os << "Url [Host = " << obj.host() << ", Protocol = " << obj.protocol() << ", Port = " << obj.port()
<< "]";
diff --git a/pulsar-client-cpp/lib/Url.h b/pulsar-client-cpp/lib/Url.h
index aaf03dc..2b413cf 100644
--- a/pulsar-client-cpp/lib/Url.h
+++ b/pulsar-client-cpp/lib/Url.h
@@ -41,6 +41,8 @@ class Url {
const std::string& parameter() const;
friend std::ostream& operator<<(std::ostream& os, const Url& obj);
+ std::string hostPort() const;
+
private:
std::string protocol_;
std::string host_;
diff --git a/pulsar-client-cpp/lib/lz4/lz4.h b/pulsar-client-cpp/lib/lz4/lz4.h
index bff8f97..c68232a 100644
--- a/pulsar-client-cpp/lib/lz4/lz4.h
+++ b/pulsar-client-cpp/lib/lz4/lz4.h
@@ -197,9 +197,11 @@ int LZ4_decompress_safe_partial(const char* source, char* dest, int compressedSi
* note : only allocated directly the structure if you are statically linking LZ4
* If you are using liblz4 as a DLL, please use below construction methods instead.
*/
+// clang-format off
typedef struct {
long long table[LZ4_STREAMSIZE_U64];
} LZ4_stream_t;
+// clang-format on
/*
* LZ4_resetStream
@@ -254,9 +256,12 @@ int LZ4_saveDict(LZ4_stream_t* streamPtr, char* safeBuffer, int dictSize);
#define LZ4_STREAMDECODESIZE_U64 4
#define LZ4_STREAMDECODESIZE (LZ4_STREAMDECODESIZE_U64 * sizeof(unsigned long long))
+// clang-format off
typedef struct {
unsigned long long table[LZ4_STREAMDECODESIZE_U64];
} LZ4_streamDecode_t;
+// clang-format on
+
/*
* LZ4_streamDecode_t
* information structure to track an LZ4 stream.
diff --git a/pulsar-client-cpp/python/test_producer.py b/pulsar-client-cpp/python/test_producer.py
index 24a2c58..8e3c655 100755
--- a/pulsar-client-cpp/python/test_producer.py
+++ b/pulsar-client-cpp/python/test_producer.py
@@ -30,9 +30,9 @@ producer = client.create_producer(
batching_max_publish_delay_ms=10
)
-while True:
+for i in range(10):
try:
- producer.send_async('hello', None)
+ producer.send('hello', None)
except Exception as e:
print("Failed to send message: %s", e)
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.