You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/04/05 14:08:44 UTC

[pulsar] branch branch-2.8 updated: [C++] Fix connection is not closed when broker closes the connection to proxy (#15009)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new d705b534d41 [C++] Fix connection is not closed when broker closes the connection to proxy (#15009)
d705b534d41 is described below

commit d705b534d41174740236569f2f804af4485b7bfe
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Apr 4 18:47:16 2022 +0800

    [C++] Fix connection is not closed when broker closes the connection to proxy (#15009)
    
    ### Motivation
    
    ### Motivation
    
    Recently we found C++ client might not refresh the OAuth2 token after
    the reconnection. It's because when Pulsar proxy is enabled, the
    `AuthResponse` might be sent by proxy, which leads to a disconnection
    from broker side. See https://github.com/apache/pulsar/blob/94cc46a66fbe322472dbb657803d21320e59079c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L687
    
    Then, proxy will return a `ServiceNotReady` error to client as the
    result of topic lookup. See
    
    https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L150-L154
    
    https://github.com/apache/pulsar/blob/d1a302e60a8f61615800679522016f21dcd01295/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/LookupProxyHandler.java#L242-L247
    
    However, in this case, C++ client only completes the future of lookup
    with `ResultConnectError`. The `ClientConnection` object is still cached
    in the pool and will be used for followed interactions.
    
    ### Modifications
    
    Like what Java client does, add `checkServerError` method to
    `ClientConnection`, which closes the socket for `ServiceNotReady` error.
    Here we don't call `close` directly just not to interrupt the execution
    of `handleIncomingCommand`. `close` will be called in `handleRead`
    because the `err` will be `boost::asio::error::bad_descriptor`.
    
    ### Verifying this change
    
    It's hard to mock the `ServiceNotReady` case in unit test. I have tested
    this patch in my private env and let the OAuth2 token expire quickly. We
    can see the following logs (hosts are hidden):
    
    ```
    2022-04-03 19:00:54.357 INFO  [0x110af6600] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.1 s
    2022-04-03 19:00:54.458 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
    2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:1001 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Failed lookup req_id: 90 error: ServiceUnitNotReady msg: Disconnected from server at test-oauth2-broker.sndev.svc.cluster.local/172.20.116.73:6650
    2022-04-03 19:00:54.953 INFO  [0x7000056de000] HandlerBase:142 | [persistent://public/default/topic-1, test-oauth2-50-7] Schedule reconnection in 0.182 s
    2022-04-03 19:00:54.953 ERROR [0x7000056de000] ClientConnection:597 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Read operation failed: Bad file descriptor
    2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:1560 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Connection closed
    2022-04-03 19:00:54.953 INFO  [0x7000056de000] ClientConnection:263 | [LOCAL_IP:50660 -> REMOTE_IP:6651] Destroyed connection
    2022-04-03 19:00:55.136 INFO  [0x7000056de000] HandlerBase:64 | [persistent://public/default/topic-1, test-oauth2-50-7] Getting connection from pool
    2022-04-03 19:00:55.136 INFO  [0x7000056de000] ConnectionPool:86 | Deleting stale connection from pool for SERVICE_URL:6651 use_count: -1 @ 0x0
    2022-04-03 19:00:55.137 INFO  [0x7000056de000] ClientConnection:189 | [<none> -> SERVICE_URL:6651] Create ClientConnection, timeout=10000
    2022-04-03 19:00:55.250 INFO  [0x7000056de000] ConnectionPool:96 | Created connection for SERVICE_URL:6651
    2022-04-03 19:00:55.702 INFO  [0x7000056de000] ClientConnection:375 | [LOCAL_IP:51081 -> REMOTE_IP:6651] Connected to broker
    2022-04-03 19:00:57.750 INFO  [0x7000056de000] ProducerImpl:189 | [persistent://public/default/topic-1, test-oauth2-50-7] Created producer on broker [LOCAL_IP:50675 -> REMOTE_IP:6651]
    ```
    
    We can see a new connection was established after handling the
    `ServiceUnitNotReady` error (port from 50660 to 51081).
    
    (cherry picked from commit 8c7e1d9e31172fed3ebc32ebf0c725226a0eae8e)
---
 pulsar-client-cpp/lib/ClientConnection.cc | 48 ++++++++++++++++++++++++-------
 pulsar-client-cpp/lib/ClientConnection.h  |  3 ++
 pulsar-client-cpp/tests/AuthTokenTest.cc  |  4 +--
 3 files changed, 42 insertions(+), 13 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 880218708d3..e17ec39b4a9 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -890,13 +890,16 @@ void ClientConnection::handleIncomingCommand() {
                             if (partitionMetadataResponse.has_error()) {
                                 LOG_ERROR(cnxString_ << "Failed partition-metadata lookup req_id: "
                                                      << partitionMetadataResponse.request_id()
-                                                     << " error: " << partitionMetadataResponse.error());
+                                                     << " error: " << partitionMetadataResponse.error()
+                                                     << " msg: " << partitionMetadataResponse.message());
+                                checkServerError(partitionMetadataResponse.error());
+                                lookupDataPromise->setFailed(getResult(partitionMetadataResponse.error()));
                             } else {
                                 LOG_ERROR(cnxString_ << "Failed partition-metadata lookup req_id: "
                                                      << partitionMetadataResponse.request_id()
                                                      << " with empty response: ");
+                                lookupDataPromise->setFailed(ResultConnectError);
                             }
-                            lookupDataPromise->setFailed(ResultConnectError);
                         } else {
                             LookupDataResultPtr lookupResultPtr = std::make_shared<LookupDataResult>();
                             lookupResultPtr->setPartitions(partitionMetadataResponse.partitions());
@@ -974,13 +977,16 @@ void ClientConnection::handleIncomingCommand() {
                             if (lookupTopicResponse.has_error()) {
                                 LOG_ERROR(cnxString_
                                           << "Failed lookup req_id: " << lookupTopicResponse.request_id()
-                                          << " error: " << lookupTopicResponse.error());
+                                          << " error: " << lookupTopicResponse.error()
+                                          << " msg: " << lookupTopicResponse.message());
+                                checkServerError(lookupTopicResponse.error());
+                                lookupDataPromise->setFailed(getResult(lookupTopicResponse.error()));
                             } else {
                                 LOG_ERROR(cnxString_
                                           << "Failed lookup req_id: " << lookupTopicResponse.request_id()
                                           << " with empty response: ");
+                                lookupDataPromise->setFailed(ResultConnectError);
                             }
-                            lookupDataPromise->setFailed(ResultConnectError);
                         } else {
                             LOG_DEBUG(cnxString_
                                       << "Received lookup response from server. req_id: "
@@ -1486,15 +1492,10 @@ void ClientConnection::close(Result result) {
         return;
     }
     state_ = Disconnected;
-    boost::system::error_code err;
-    if (socket_) {
-        socket_->close(err);
-        if (err) {
-            LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
-        }
-    }
 
+    closeSocket();
     if (tlsSocket_) {
+        boost::system::error_code err;
         tlsSocket_->lowest_layer().close(err);
         if (err) {
             LOG_WARN(cnxString_ << "Failed to close TLS socket: " << err.message());
@@ -1639,4 +1640,29 @@ Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(con
     return promise.getFuture();
 }
 
+void ClientConnection::closeSocket() {
+    boost::system::error_code err;
+    if (socket_) {
+        socket_->close(err);
+        if (err) {
+            LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
+        }
+    }
+}
+
+void ClientConnection::checkServerError(const proto::ServerError& error) {
+    switch (error) {
+        case proto::ServerError::ServiceNotReady:
+            closeSocket();
+            break;
+        case proto::ServerError::TooManyRequests:
+            // TODO: Implement maxNumberOfRejectedRequestPerConnection like
+            // https://github.com/apache/pulsar/pull/274
+            closeSocket();
+            break;
+        default:
+            break;
+    }
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index 150a37bc0fd..05e62746eb5 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -339,6 +339,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
     friend class PulsarFriend;
 
     bool isTlsAllowInsecureConnection_ = false;
+
+    void closeSocket();
+    void checkServerError(const proto::ServerError& error);
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/tests/AuthTokenTest.cc b/pulsar-client-cpp/tests/AuthTokenTest.cc
index 1b136b093e5..ede5e818d3d 100644
--- a/pulsar-client-cpp/tests/AuthTokenTest.cc
+++ b/pulsar-client-cpp/tests/AuthTokenTest.cc
@@ -176,11 +176,11 @@ TEST(AuthPluginToken, testNoAuth) {
 
     Producer producer;
     Result result = client.createProducer(topicName, producer);
-    ASSERT_EQ(ResultConnectError, result);
+    ASSERT_EQ(ResultAuthorizationError, result);
 
     Consumer consumer;
     result = client.subscribe(topicName, subName, consumer);
-    ASSERT_EQ(ResultConnectError, result);
+    ASSERT_EQ(ResultAuthorizationError, result);
 }
 
 TEST(AuthPluginToken, testNoAuthWithHttp) {