You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/05/31 02:21:05 UTC

[pulsar] branch master updated: Limit the number of times lookup requests are redirected (#7096)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 04035c5  Limit the number of times lookup requests are redirected (#7096)
04035c5 is described below

commit 04035c59d6b55479a443cc64e8351f4a467cc253
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Sun May 31 11:20:50 2020 +0900

    Limit the number of times lookup requests are redirected (#7096)
    
    Master Issue: #7041
    
    ### Motivation
    
    When a leader broker is restarted, some producers for topics owned by that broker may not be reopened on the new broker. When this happens, message publishing will continue to fail until the client application is restarted.
    
    As a result of the investigation, I found that lookup requests sent by the producers in question are redirected more than 10,000 times between multiple brokers.
    
    When a lookup request is redirected, `BinaryProtoLookupService#findBroker()` is called recursively. Therefore, tens of thousands of redirects will cause `StackOverflowError` and `BinaryProtoLookupService#findBroker()` will never complete.
    
    ### Modifications
    
    Limit the number of times a lookup is redirected to 100. This maximum is user configurable. If the number of redirects exceeds 100, the lookup will fail. But `ConnectionHandler` retries lookup so that the producer can eventually reconnect to the new broker.
---
 .../apache/pulsar/client/api/ClientBuilder.java    |   9 ++
 .../client/impl/BinaryProtoLookupService.java      |  25 +++-
 .../pulsar/client/impl/ClientBuilderImpl.java      |   6 +
 .../org/apache/pulsar/client/impl/HttpClient.java  |   1 +
 .../client/impl/conf/ClientConfigurationData.java  |   1 +
 .../client/impl/BinaryProtoLookupServiceTest.java  | 128 +++++++++++++++++++++
 .../impl/conf/ConfigurationDataUtilsTest.java      |   3 +
 7 files changed, 168 insertions(+), 5 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index e84f8ba..c12e9a1 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -387,6 +387,15 @@ public interface ClientBuilder extends Cloneable {
     ClientBuilder maxLookupRequests(int maxLookupRequests);
 
     /**
+     * Set the maximum number of times a lookup-request to a broker will be redirected.
+     *
+     * @since 2.6.0
+     * @param maxLookupRedirects the maximum number of redirects
+     * @return the client builder instance
+     */
+    ClientBuilder maxLookupRedirects(int maxLookupRedirects);
+
+    /**
      * Set max number of broker-rejected requests in a certain time-frame (30 seconds) after which current connection
      * will be closed and client creates a new connection that give chance to connect a different broker <i>(default:
      * 50)</i>.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 102f394..d3a7df5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -54,12 +54,14 @@ public class BinaryProtoLookupService implements LookupService {
     private final ServiceNameResolver serviceNameResolver;
     private final boolean useTls;
     private final ExecutorService executor;
+    private final int maxLookupRedirects;
 
     public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, boolean useTls, ExecutorService executor)
             throws PulsarClientException {
         this.client = client;
         this.useTls = useTls;
         this.executor = executor;
+        this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects();
         this.serviceNameResolver = new PulsarServiceNameResolver();
         updateServiceUrl(serviceUrl);
     }
@@ -77,7 +79,7 @@ public class BinaryProtoLookupService implements LookupService {
      * @return broker-socket-address that serves given topic
      */
     public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName) {
-        return findBroker(serviceNameResolver.resolveHost(), false, topicName);
+        return findBroker(serviceNameResolver.resolveHost(), false, topicName, 0);
     }
 
     /**
@@ -89,9 +91,15 @@ public class BinaryProtoLookupService implements LookupService {
     }
 
     private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker(InetSocketAddress socketAddress,
-            boolean authoritative, TopicName topicName) {
+            boolean authoritative, TopicName topicName, final int redirectCount) {
         CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> addressFuture = new CompletableFuture<>();
 
+        if (maxLookupRedirects > 0 && redirectCount > maxLookupRedirects) {
+            addressFuture.completeExceptionally(
+                    new PulsarClientException.LookupException("Too many redirects: " + maxLookupRedirects));
+            return addressFuture;
+        }
+
         client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
             long requestId = client.newRequestId();
             ByteBuf request = Commands.newLookup(topicName.toString(), authoritative, requestId);
@@ -110,13 +118,20 @@ public class BinaryProtoLookupService implements LookupService {
 
                     // (2) redirect to given address if response is: redirect
                     if (lookupDataResult.redirect) {
-                        findBroker(responseBrokerAddress, lookupDataResult.authoritative, topicName)
+                        findBroker(responseBrokerAddress, lookupDataResult.authoritative, topicName, redirectCount + 1)
                                 .thenAccept(addressPair -> {
                                     addressFuture.complete(addressPair);
                                 }).exceptionally((lookupException) -> {
                                     // lookup failed
-                                    log.warn("[{}] lookup failed : {}", topicName.toString(),
-                                            lookupException.getMessage(), lookupException);
+                                    if (redirectCount > 0) {
+                                        if (log.isDebugEnabled()) {
+                                            log.debug("[{}] lookup redirection failed ({}) : {}", topicName.toString(),
+                                                    redirectCount, lookupException.getMessage());
+                                        }
+                                    } else {
+                                        log.warn("[{}] lookup failed : {}", topicName.toString(),
+                                                lookupException.getMessage(), lookupException);
+                                    }
                                     addressFuture.completeExceptionally(lookupException);
                                     return null;
                                 });
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 8d3fdbe..7a56e42 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -241,6 +241,12 @@ public class ClientBuilderImpl implements ClientBuilder {
     }
 
     @Override
+    public ClientBuilder maxLookupRedirects(int maxLookupRedirects) {
+        conf.setMaxLookupRedirects(maxLookupRedirects);
+        return this;
+    }
+
+    @Override
     public ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) {
         conf.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
         return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 0b4ef32..3ba02bb 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -72,6 +72,7 @@ public class HttpClient implements Closeable {
 
         DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
         confBuilder.setFollowRedirect(true);
+        confBuilder.setMaxRedirects(conf.getMaxLookupRedirects());
         confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000);
         confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
         confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 980630c..76bc8d9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -67,6 +67,7 @@ public class ClientConfigurationData implements Serializable, Cloneable {
     private boolean tlsHostnameVerificationEnable = false;
     private int concurrentLookupRequest = 5000;
     private int maxLookupRequest = 50000;
+    private int maxLookupRedirects = 100;
     private int maxNumberOfRejectedRequestPerConnection = 50;
     private int keepAliveIntervalSeconds = 30;
     private int connectionTimeoutMs = 10000;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
new file mode 100644
index 0000000..be09cde
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import io.netty.buffer.ByteBuf;
+
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.client.api.PulsarClientException.LookupException;
+import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.naming.TopicName;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class BinaryProtoLookupServiceTest {
+    private BinaryProtoLookupService lookup;
+    private TopicName topicName;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        LookupDataResult lookupResult1 = createLookupDataResult("pulsar://broker1.pulsar.apache.org:6650", true);
+        LookupDataResult lookupResult2 = createLookupDataResult("pulsar://broker2.pulsar.apache.org:6650", false);
+
+        CompletableFuture<LookupDataResult> lookupFuture1 = CompletableFuture.completedFuture(lookupResult1);
+        CompletableFuture<LookupDataResult> lookupFuture2 = CompletableFuture.completedFuture(lookupResult2);
+
+        ClientCnx clientCnx = mock(ClientCnx.class);
+        when(clientCnx.newLookup(any(ByteBuf.class), anyLong())).thenReturn(lookupFuture1, lookupFuture1,
+                lookupFuture2);
+
+        CompletableFuture<ClientCnx> connectionFuture = CompletableFuture.completedFuture(clientCnx);
+
+        ConnectionPool cnxPool = mock(ConnectionPool.class);
+        when(cnxPool.getConnection(any(InetSocketAddress.class))).thenReturn(connectionFuture);
+
+        ClientConfigurationData clientConfig = mock(ClientConfigurationData.class);
+        doReturn(0).when(clientConfig).getMaxLookupRedirects();
+
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        doReturn(cnxPool).when(client).getCnxPool();
+        doReturn(clientConfig).when(client).getConfiguration();
+        doReturn(1L).when(client).newRequestId();
+
+        lookup = spy(
+                new BinaryProtoLookupService(client, "pulsar://localhost:6650", false, mock(ExecutorService.class)));
+        topicName = TopicName.get("persistent://tenant1/ns1/t1");
+    }
+
+    @Test(invocationTimeOut = 3000)
+    public void maxLookupRedirectsTest1() throws Exception {
+        Pair<InetSocketAddress, InetSocketAddress> addressPair = lookup.getBroker(topicName).get();
+        assertEquals(addressPair.getLeft().toString(), "broker2.pulsar.apache.org:6650");
+        assertEquals(addressPair.getRight().toString(), "broker2.pulsar.apache.org:6650");
+    }
+
+    @Test(invocationTimeOut = 3000)
+    public void maxLookupRedirectsTest2() throws Exception {
+        Field field = BinaryProtoLookupService.class.getDeclaredField("maxLookupRedirects");
+        field.setAccessible(true);
+        field.set(lookup, 2);
+
+        Pair<InetSocketAddress, InetSocketAddress> addressPair = lookup.getBroker(topicName).get();
+        assertEquals(addressPair.getLeft().toString(), "broker2.pulsar.apache.org:6650");
+        assertEquals(addressPair.getRight().toString(), "broker2.pulsar.apache.org:6650");
+    }
+
+    @Test(invocationTimeOut = 3000)
+    public void maxLookupRedirectsTest3() throws Exception {
+        Field field = BinaryProtoLookupService.class.getDeclaredField("maxLookupRedirects");
+        field.setAccessible(true);
+        field.set(lookup, 1);
+
+        try {
+            lookup.getBroker(topicName).get();
+            fail("should have thrown ExecutionException");
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            assertTrue(cause instanceof LookupException);
+            assertEquals(cause.getMessage(), "Too many redirects: 1");
+        }
+    }
+
+    private static LookupDataResult createLookupDataResult(String brokerUrl, boolean redirect) throws Exception {
+        LookupDataResult lookupResult = new LookupDataResult(-1);
+
+        Field brokerUrlField = LookupDataResult.class.getDeclaredField("brokerUrl");
+        brokerUrlField.setAccessible(true);
+        brokerUrlField.set(lookupResult, brokerUrl);
+
+        Field redirectField = LookupDataResult.class.getDeclaredField("redirect");
+        redirectField.setAccessible(true);
+        redirectField.set(lookupResult, redirect);
+
+        return lookupResult;
+    }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
index b144f71..9634a87 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java
@@ -43,13 +43,16 @@ public class ConfigurationDataUtilsTest {
         ClientConfigurationData confData = new ClientConfigurationData();
         confData.setServiceUrl("pulsar://unknown:6650");
         confData.setMaxLookupRequest(600);
+        confData.setMaxLookupRedirects(10);
         confData.setNumIoThreads(33);
         Map<String, Object> config = new HashMap<>();
         config.put("serviceUrl", "pulsar://localhost:6650");
         config.put("maxLookupRequest", 70000);
+        config.put("maxLookupRedirects", 50);
         confData = ConfigurationDataUtils.loadData(config, confData, ClientConfigurationData.class);
         assertEquals("pulsar://localhost:6650", confData.getServiceUrl());
         assertEquals(70000, confData.getMaxLookupRequest());
+        assertEquals(50, confData.getMaxLookupRedirects());
         assertEquals(33, confData.getNumIoThreads());
     }