You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/08/31 01:56:44 UTC
[pulsar] branch branch-2.8 updated (00f99bb -> 91bc3cc)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.
from 00f99bb fix java_test_functions build failed (#11829)
new 6486a55 Fixed Proxy leaking oubound connections (#11848)
new 91bc3cc Handle receiveAsync() failures in MultiTopicsConsumer (#11843)
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../client/impl/MultiTopicsConsumerImpl.java | 4 ++++
.../pulsar/proxy/server/ProxyConnection.java | 23 ++++++++++++++++------
2 files changed, 21 insertions(+), 6 deletions(-)
[pulsar] 02/02: Handle receiveAsync() failures in
MultiTopicsConsumer (#11843)
Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 91bc3cc9a9234fa169e8245e9b9f99b452dd64eb
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Aug 30 18:53:36 2021 -0700
Handle receiveAsync() failures in MultiTopicsConsumer (#11843)
---
.../java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 997358d..7588c34 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -269,6 +269,10 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
// recursion and stack overflow
internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer));
}
+ }).exceptionally(ex -> {
+ log.error("Receive operation failed on consumer {} - Retrying later", consumer, ex);
+ internalPinnedExecutor.schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS);
+ return null;
});
}
[pulsar] 01/02: Fixed Proxy leaking oubound connections (#11848)
Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6486a55ca735e01af0b1b2af1b8ec9e9d035cf19
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Aug 30 18:53:22 2021 -0700
Fixed Proxy leaking oubound connections (#11848)
---
.../pulsar/proxy/server/ProxyConnection.java | 23 ++++++++++++++++------
1 file changed, 17 insertions(+), 6 deletions(-)
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 3fdb0d5e..25f88102 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -69,6 +69,7 @@ import lombok.Getter;
public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
// ConnectionPool is used by the proxy to issue lookup requests
private PulsarClientImpl client;
+ private ConnectionPool connectionPool;
private ProxyService service;
private Authentication clientAuthentication;
AuthenticationDataSource authenticationData;
@@ -159,6 +160,14 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
}
service.getClientCnxs().remove(this);
LOG.info("[{}] Connection closed", remoteAddress);
+
+ if (connectionPool != null) {
+ try {
+ connectionPool.close();
+ } catch (Exception e) {
+ LOG.error("Failed to close connection pool {}", e.getMessage(), e);
+ }
+ }
}
@Override
@@ -297,9 +306,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
// authn not enabled, complete
if (!service.getConfiguration().isAuthenticationEnabled()) {
- this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(),
- new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
- () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)), service.getTimer());
+ this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+ () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion));
+ this.client =
+ new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
completeConnect();
return;
@@ -434,9 +444,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData,
final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
- return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
- new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf,
- service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)), service.getTimer());
+ this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+ () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
+ clientAuthMethod, protocolVersion));
+ return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
}
private static int getProtocolVersionToAdvertise(CommandConnect connect) {