You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2021/08/31 01:56:44 UTC

[pulsar] branch branch-2.8 updated (00f99bb -> 91bc3cc)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 00f99bb  fix java_test_functions build failed (#11829)
     new 6486a55  Fixed Proxy leaking oubound connections (#11848)
     new 91bc3cc  Handle receiveAsync() failures in MultiTopicsConsumer (#11843)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../client/impl/MultiTopicsConsumerImpl.java       |  4 ++++
 .../pulsar/proxy/server/ProxyConnection.java       | 23 ++++++++++++++++------
 2 files changed, 21 insertions(+), 6 deletions(-)

[pulsar] 02/02: Handle receiveAsync() failures in MultiTopicsConsumer (#11843)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 91bc3cc9a9234fa169e8245e9b9f99b452dd64eb
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Aug 30 18:53:36 2021 -0700

    Handle receiveAsync() failures in MultiTopicsConsumer (#11843)
---
 .../java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java   | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 997358d..7588c34 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -269,6 +269,10 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                 // recursion and stack overflow
                 internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer));
             }
+        }).exceptionally(ex -> {
+            log.error("Receive operation failed on consumer {} - Retrying later", consumer, ex);
+            internalPinnedExecutor.schedule(() -> receiveMessageFromConsumer(consumer), 10, TimeUnit.SECONDS);
+            return null;
         });
     }
 

[pulsar] 01/02: Fixed Proxy leaking oubound connections (#11848)

Posted by mm...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 6486a55ca735e01af0b1b2af1b8ec9e9d035cf19
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Aug 30 18:53:22 2021 -0700

    Fixed Proxy leaking oubound connections (#11848)
---
 .../pulsar/proxy/server/ProxyConnection.java       | 23 ++++++++++++++++------
 1 file changed, 17 insertions(+), 6 deletions(-)

diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 3fdb0d5e..25f88102 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -69,6 +69,7 @@ import lombok.Getter;
 public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
     // ConnectionPool is used by the proxy to issue lookup requests
     private PulsarClientImpl client;
+    private ConnectionPool connectionPool;
     private ProxyService service;
     private Authentication clientAuthentication;
     AuthenticationDataSource authenticationData;
@@ -159,6 +160,14 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
         }
         service.getClientCnxs().remove(this);
         LOG.info("[{}] Connection closed", remoteAddress);
+
+        if (connectionPool != null) {
+            try {
+                connectionPool.close();
+            } catch (Exception e) {
+                LOG.error("Failed to close connection pool {}", e.getMessage(), e);
+            }
+        }
     }
 
     @Override
@@ -297,9 +306,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
 
             // authn not enabled, complete
             if (!service.getConfiguration().isAuthenticationEnabled()) {
-                this.client = new PulsarClientImpl(clientConf, service.getWorkerGroup(),
-                    new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
-                        () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion)), service.getTimer());
+                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+                        () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion));
+                this.client =
+                        new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
 
                 completeConnect();
                 return;
@@ -434,9 +444,10 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi
 
     private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData,
             final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
-        return new PulsarClientImpl(clientConf, service.getWorkerGroup(),
-                new ProxyConnectionPool(clientConf, service.getWorkerGroup(), () -> new ProxyClientCnx(clientConf,
-                        service.getWorkerGroup(), clientAuthRole, clientAuthData, clientAuthMethod, protocolVersion)), service.getTimer());
+        this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+                () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
+                        clientAuthMethod, protocolVersion));
+        return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
     }
 
     private static int getProtocolVersionToAdvertise(CommandConnect connect) {