You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/03 05:58:42 UTC
[GitHub] [pulsar] BewareMyPower opened a new pull request #15009: [C++] Fix connection is not closed when broker closes the connection to proxy
BewareMyPower opened a new pull request #15009:
URL: https://github.com/apache/pulsar/pull/15009
### Motivation
Recently we found C++ client might not refresh the OAuth2 token after
the reconnection. It's because when Pulsar proxy is enabled, the
`AuthResponse` might be sent by proxy, which leads to a disconnection
from broker side. See https://github.com/apache/pulsar/blob/94cc46a66fbe322472dbb657803d21320e59079c/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L687
Then, proxy will return a `ServiceNotReady` error to client as the
result of topic lookup. However, in this case, C++ client only completes
the future of lookup with `ResultConnectError`. The `ClientConnection`
object is still cached in the pool and will be used for followed
interactions.
In addition, the `AuthChallenge` request from broker might never be sent
to client when proxy is enabled. So there is no chance for client to
refresh the token unless a new connection is established.
### Modifications
- Like what Java client does, add `checkServerError` method to
`ClientConnection`, which closes the socket for `ServiceNotReady`
error. Here we don't call `close` directly just not to interrupt the
execution of `handleIncomingCommand`. `close` will be called in
`handleRead` because the `err` will be
`boost::asio::error::bad_descriptor`.
- To ensure `ClientConnection::close` can remove itself from the
connection pool, add `remove` method in `ConnectionPool` to remove a
connection by address. Then hold a reference to the pool in
`ClientConnection`. At the beginning of `close()`, remove the
connection from pool.
### Verifying this change
It's hard to mock the `ServiceNotReady` case in unit test. I have tested
this patch in my private env and let the OAuth2 token expire quickly. We
can see the following logs (hosts are hidden):
```
2022-04-03 13:07:55.616 ERROR [0x700006480000] ClientConnection:1002 | [<local-ip>:62059 -> <remote-ip>:6651] Failed lookup req_id: 136 error: ServiceUnitNotReady msg: writeAddress(..) failed: Broken pipe
2022-04-03 13:07:55.616 ERROR [0x700006480000] ClientImpl:443 | Error getting topic partitions metadata: ServiceUnitNotReady
2022-04-03 13:07:55.617 ERROR [0x700006480000] ClientConnection:599 | [<local-ip>:62059 -> <remote-ip>:6651] Read operation failed: Bad file descriptor
2022-04-03 13:07:55.617 INFO [0x700006480000] ConnectionPool:119 | Removing connection from pool for <proxy-url> use_count: 3 @ 0x7f7d3680ec00
2022-04-03 13:07:55.617 INFO [0x700006480000] ClientConnection:1568 | [<local-ip>:62059 -> <remote-ip>:6651] Connection closed
2022-04-03 13:07:55.617 INFO [0x700006480000] ClientConnection:265 | [<local-ip>:62059 -> <remote-ip>:6651] Destroyed connection
2022-04-03 13:07:56.618 INFO [0x113d8d600] ClientConnection:191 | [<none> -> <proxy-url>] Create ClientConnection, timeout=10000
2022-04-03 13:08:00.841 INFO [0x113d8d600] ConnectionPool:97 | Created connection for <proxy-url>
2022-04-03 13:08:01.348 INFO [0x700006480000] ClientConnection:377 | [<local-ip>:62552 -> <remote-ip>:6651] Connected to broker
```
We can see a new connection (with a new port) was established after
handling the `ServiceUnitNotReady` error.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on pull request #15009: [C++] Fix connection is not closed when broker closes the connection to proxy
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on pull request #15009:
URL: https://github.com/apache/pulsar/pull/15009#issuecomment-1086840315
In addition to the test, I tried to schedule reconnection to a remote endpoint periodically.
```c++
Producer producer;
auto result = client.createProducer("topic-1", producer);
ASSERT_EQ(result, ResultOk);
for (int i = 0; i < 100000; i++) {
auto state = PulsarFriend::reconnection(producer);
LOG_INFO(i << " reconnection: " << state);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
```
The helper method in `PulsarFriend` is
```c++
static HandlerBase::State reconnection(Producer producer) {
auto impl = std::static_pointer_cast<ProducerImpl>(producer.impl_);
impl->connection_.reset();
impl->state_ = HandlerBase::State::Pending;
HandlerBase::scheduleReconnection(std::static_pointer_cast<HandlerBase>(impl));
for (int i = 0; i < 1000; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::lock_guard<std::mutex> lock(impl->mutex_);
if (impl->state_ == HandlerBase::State::Ready) {
break;
}
}
std::lock_guard<std::mutex> lock(impl->mutex_);
return impl->state_;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org