You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/04/05 14:08:44 UTC
[pulsar] branch branch-2.8 updated: [C++] Fix connection is not closed when broker closes the connection to proxy (#15009)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new d705b534d41 [C++] Fix connection is not closed when broker closes the connection to proxy (#15009)
d705b534d41 is described below
commit d705b534d41174740236569f2f804af4485b7bfe
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Apr 4 18:47:16 2022 +0800
[C++] Fix connection is not closed when broker closes the connection to proxy (#15009)
### Motivation
### Motivation
Recently we found C++ client might not refresh the OAuth2 token after
the reconnection. It's because when Pulsar proxy is enabled, the
`AuthResponse` might be sent by proxy, which leads to a disconnection
from broker side. See https://github.com/apache/pulsar/blob/94cc46a66fbe322472dbb657803d21320e59079c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L687
Then, proxy will return a `ServiceNotReady` error to client as the
result of topic lookup. See
https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L150-L154
https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L242-L247
However, in this case, C++ client only completes the future of lookup
with `ResultConnectError`. The `ClientConnection` object is still cached
in the pool and will be used for followed interactions.
### Modifications
Like what Java client does, add `checkServerError` method to
`ClientConnection`, which closes the socket for `ServiceNotReady` error.
Here we don't call `close` directly just not to interrupt the execution
of `handleIncomingCommand`. `close` will be called in `handleRead`
because the `err` will be `boost::asio::error::bad_descriptor`.
### Verifying this change
It's hard to mock the `ServiceNotReady` case in unit test. I have tested
this patch in my private env and let the OAuth2 token expire quickly. We
can see the following logs (hosts are hidden):
```
2022-04-03 19:00:54.357 INFO [0x110af6600] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.1 s
2022-04-03 19:00:54.458 INFO [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:1001 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Failed lookup req_id: 90 error: ServiceUnitNotReady msg: Disconnected from server at test-oauth2-broker.sndev.svc.cluster.local/172.20.116.73:6650
2022-04-03 19:00:54.953 INFO [0x7000056de000] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.182 s
2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:597 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Read operation failed: Bad file descriptor
2022-04-03 19:00:54.953 INFO [0x7000056de000] ClientConnection:1560 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Connection closed
2022-04-03 19:00:54.953 INFO [0x7000056de000] ClientConnection:263 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Destroyed connection
2022-04-03 19:00:55.136 INFO [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
2022-04-03 19:00:55.136 INFO [0x7000056de000] ConnectionPool:86 | Deleting stale connection from pool for SERVICE_URL:6651 use_count: -1 @ 0x0
2022-04-03 19:00:55.137 INFO [0x7000056de000] ClientConnection:189 | [<none> -> SERVICE_URL:6651] Create ClientConnection, timeout=10000
2022-04-03 19:00:55.250 INFO [0x7000056de000] ConnectionPool:96 | Created connection for SERVICE_URL:6651
2022-04-03 19:00:55.702 INFO [0x7000056de000] ClientConnection:375 | [LOCAL_IP:51081 -> REMOTE_IP:6651] Connected to broker
2022-04-03 19:00:57.750 INFO [0x7000056de000] ProducerImpl:189 | [persistent://public/default/topic-1, test-oauth2-50-7] Created producer on broker [LOCAL_IP:50675 -> REMOTE_IP:6651]
```
We can see a new connection was established after handling the
`ServiceUnitNotReady` error (port from 50660 to 51081).
(cherry picked from commit 8c7e1d9e31172fed3ebc32ebf0c725226a0eae8e)
---
pulsar-client-cpp/lib/ClientConnection.cc | 48 ++++++++++++++++++++++++-------
pulsar-client-cpp/lib/ClientConnection.h | 3 ++
pulsar-client-cpp/tests/AuthTokenTest.cc | 4 +--
3 files changed, 42 insertions(+), 13 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 880218708d3..e17ec39b4a9 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -890,13 +890,16 @@ void ClientConnection::handleIncomingCommand() {
if (partitionMetadataResponse.has_error()) {
LOG_ERROR(cnxString_ << "Failed partition-metadata lookup req_id: "
<< partitionMetadataResponse.request_id()
- << " error: " << partitionMetadataResponse.error());
+ << " error: " << partitionMetadataResponse.error()
+ << " msg: " << partitionMetadataResponse.message());
+ checkServerError(partitionMetadataResponse.error());
+ lookupDataPromise->setFailed(getResult(partitionMetadataResponse.error()));
} else {
LOG_ERROR(cnxString_ << "Failed partition-metadata lookup req_id: "
<< partitionMetadataResponse.request_id()
<< " with empty response: ");
+ lookupDataPromise->setFailed(ResultConnectError);
}
- lookupDataPromise->setFailed(ResultConnectError);
} else {
LookupDataResultPtr lookupResultPtr = std::make_shared<LookupDataResult>();
lookupResultPtr->setPartitions(partitionMetadataResponse.partitions());
@@ -974,13 +977,16 @@ void ClientConnection::handleIncomingCommand() {
if (lookupTopicResponse.has_error()) {
LOG_ERROR(cnxString_
<< "Failed lookup req_id: " << lookupTopicResponse.request_id()
- << " error: " << lookupTopicResponse.error());
+ << " error: " << lookupTopicResponse.error()
+ << " msg: " << lookupTopicResponse.message());
+ checkServerError(lookupTopicResponse.error());
+ lookupDataPromise->setFailed(getResult(lookupTopicResponse.error()));
} else {
LOG_ERROR(cnxString_
<< "Failed lookup req_id: " << lookupTopicResponse.request_id()
<< " with empty response: ");
+ lookupDataPromise->setFailed(ResultConnectError);
}
- lookupDataPromise->setFailed(ResultConnectError);
} else {
LOG_DEBUG(cnxString_
<< "Received lookup response from server. req_id: "
@@ -1486,15 +1492,10 @@ void ClientConnection::close(Result result) {
return;
}
state_ = Disconnected;
- boost::system::error_code err;
- if (socket_) {
- socket_->close(err);
- if (err) {
- LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
- }
- }
+ closeSocket();
if (tlsSocket_) {
+ boost::system::error_code err;
tlsSocket_->lowest_layer().close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close TLS socket: " << err.message());
@@ -1639,4 +1640,29 @@ Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(con
return promise.getFuture();
}
+void ClientConnection::closeSocket() {
+ boost::system::error_code err;
+ if (socket_) {
+ socket_->close(err);
+ if (err) {
+ LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
+ }
+ }
+}
+
+void ClientConnection::checkServerError(const proto::ServerError& error) {
+ switch (error) {
+ case proto::ServerError::ServiceNotReady:
+ closeSocket();
+ break;
+ case proto::ServerError::TooManyRequests:
+ // TODO: Implement maxNumberOfRejectedRequestPerConnection like
+ // https://github.com/apache/pulsar/pull/274
+ closeSocket();
+ break;
+ default:
+ break;
+ }
+}
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index 150a37bc0fd..05e62746eb5 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -339,6 +339,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
friend class PulsarFriend;
bool isTlsAllowInsecureConnection_ = false;
+
+ void closeSocket();
+ void checkServerError(const proto::ServerError& error);
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/tests/AuthTokenTest.cc b/pulsar-client-cpp/tests/AuthTokenTest.cc
index 1b136b093e5..ede5e818d3d 100644
--- a/pulsar-client-cpp/tests/AuthTokenTest.cc
+++ b/pulsar-client-cpp/tests/AuthTokenTest.cc
@@ -176,11 +176,11 @@ TEST(AuthPluginToken, testNoAuth) {
Producer producer;
Result result = client.createProducer(topicName, producer);
- ASSERT_EQ(ResultConnectError, result);
+ ASSERT_EQ(ResultAuthorizationError, result);
Consumer consumer;
result = client.subscribe(topicName, subName, consumer);
- ASSERT_EQ(ResultConnectError, result);
+ ASSERT_EQ(ResultAuthorizationError, result);
}
TEST(AuthPluginToken, testNoAuthWithHttp) {