You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/09/24 08:06:29 UTC

[pulsar] branch branch-2.5 updated: [Issue 8093]Fix client lookup hangs when broker restarts (#8101)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 61a7bd6  [Issue 8093]Fix client lookup hangs when broker restarts (#8101)
61a7bd6 is described below

commit 61a7bd6e86078041037bd2cca87e3f28d82f4efa
Author: 4onni <ch...@gmail.com>
AuthorDate: Thu Sep 24 15:03:59 2020 +0800

    [Issue 8093]Fix client lookup hangs when broker restarts (#8101)
    
    Fixes #8093
    
    Client hangs forever when all brokers stop and then restart.
    There are several steps need to be finished before the broker can be fully started, as illustrated in the pseudo code below:
    
    ```
    PulsarService#start():
        broker.start(); // Step 1
        webService.start(); // Step 2
        leaderElectionService.start(); //Step 3
    ```
    If a lookup request gets in between Step 2 and Step 3, a NPE would be thrown, which will block all other coming requests from getting processed properly.
    
    Client can only connect to the broker after the election service started successfully
    
    This change added tests and can be verified as follows:
     - * Added 2 test cases under `LeaderElectionServiceTest`
    (cherry picked from commit 65cf9c095ea20c0a546e77458274848094131163)
---
 .../broker/loadbalance/LeaderElectionService.java  |   8 +
 .../pulsar/broker/namespace/NamespaceService.java  |   5 +
 .../loadbalance/LeaderElectionServiceTest.java     | 192 +++++++++++++++++++++
 3 files changed, 205 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
index eb9747f..8904b4f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LeaderElectionService.java
@@ -54,6 +54,8 @@ public class LeaderElectionService {
 
     private boolean stopped = true;
 
+    private boolean elected = false;
+
     private final ZooKeeper zkClient;
 
     private final AtomicReference<LeaderBroker> currentLeader = new AtomicReference<LeaderBroker>();
@@ -118,6 +120,7 @@ public class LeaderElectionService {
             LeaderBroker leaderBroker = jsonMapper.readValue(data, LeaderBroker.class);
             currentLeader.set(leaderBroker);
             isLeader.set(false);
+            elected = true;
             leaderListener.brokerIsAFollowerNow();
 
             // If broker comes here it is a follower. Do nothing, wait for the watch to trigger
@@ -135,6 +138,7 @@ public class LeaderElectionService {
                 // Update the current leader and set the flag to true
                 currentLeader.set(new LeaderBroker(leaderBroker.getServiceUrl()));
                 isLeader.set(true);
+                elected = true;
 
                 // Notify the listener that this broker is now the leader so that it can collect usage and start load
                 // manager.
@@ -200,4 +204,8 @@ public class LeaderElectionService {
         return isLeader.get();
     }
 
+    public boolean isElected() {
+        return elected;
+    }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 039ea6b..c7bf68f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -385,6 +385,11 @@ public class NamespaceService {
 
     private void searchForCandidateBroker(NamespaceBundle bundle,
             CompletableFuture<Optional<LookupResult>> lookupFuture, boolean authoritative) {
+        if( null == pulsar.getLeaderElectionService() || ! pulsar.getLeaderElectionService().isElected()) {
+            LOG.warn("The leader election has not yet been completed! NamespaceBundle[{}]", bundle);
+            lookupFuture.completeExceptionally(new IllegalStateException("The leader election has not yet been completed!"));
+            return;
+        }
         String candidateBroker = null;
         try {
             // check if this is Heartbeat or SLAMonitor namespace
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
new file mode 100644
index 0000000..4691b0e
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LeaderElectionServiceTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.loadbalance;
+
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.apache.pulsar.zookeeper.ZooKeeperCache;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+@Slf4j
+public class LeaderElectionServiceTest {
+
+    private LocalBookkeeperEnsemble bkEnsemble;
+    private LeaderElectionService.LeaderListener listener;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
+        bkEnsemble.start();
+        log.info("---- bk started ----");
+        listener = new LeaderElectionService.LeaderListener() {
+            @Override
+            public void brokerIsTheLeaderNow() {
+                log.info("i am a leader");
+            }
+
+            @Override
+            public void brokerIsAFollowerNow() {
+                log.info("i am a follower");
+            }
+        };
+    }
+
+    @AfterMethod
+    void shutdown() throws Exception {
+        bkEnsemble.stop();
+        log.info("---- bk stopped ----");
+    }
+
+
+    @Test
+    public void electedShouldBeTrue() {
+        final ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();
+        final String safeWebServiceAddress = "http://localhost:8080";
+        ZooKeeperCache zkCache = Mockito.mock(ZooKeeperCache.class);
+        PulsarService pulsar = Mockito.mock(PulsarService.class);
+
+        Mockito.when(pulsar.getZkClient()).thenReturn(bkEnsemble.getZkClient());
+        Mockito.when(pulsar.getExecutor()).thenReturn(ses);
+        Mockito.when(pulsar.getSafeWebServiceAddress()).thenReturn(safeWebServiceAddress);
+
+        Mockito.when(zkCache.getZooKeeper()).thenReturn(bkEnsemble.getZkClient());
+        Mockito.when(pulsar.getLocalZkCache()).thenReturn(zkCache);
+
+        LeaderElectionService leaderElectionService = new LeaderElectionService(pulsar, listener);
+        leaderElectionService.start();
+        Assert.assertTrue(leaderElectionService.isElected());
+        Assert.assertTrue(leaderElectionService.isLeader());
+        Assert.assertEquals(leaderElectionService.getCurrentLeader().getServiceUrl(), safeWebServiceAddress);
+        log.info("leader state {} {} {}",
+                leaderElectionService.isElected(),
+                leaderElectionService.isLeader(),
+                leaderElectionService.getCurrentLeader().getServiceUrl());
+
+        LeaderElectionService followerElectionService = new LeaderElectionService(pulsar, listener);
+        followerElectionService.start();
+        Assert.assertTrue(followerElectionService.isElected());
+        Assert.assertFalse(followerElectionService.isLeader());
+        Assert.assertEquals(followerElectionService.getCurrentLeader().getServiceUrl(), safeWebServiceAddress);
+        log.info("follower state {} {} {}",
+                followerElectionService.isElected(),
+                followerElectionService.isLeader(),
+                followerElectionService.getCurrentLeader().getServiceUrl());
+    }
+
+    @Test
+    public void anErrorShouldBeThrowBeforeLeaderElected() throws PulsarServerException, PulsarClientException, PulsarAdminException {
+        final String clusterName = "elect-test";
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setBrokerServicePort(Optional.of(6650));
+        config.setWebServicePort(Optional.of(8080));
+        config.setClusterName(clusterName);
+        config.setAdvertisedAddress("localhost");
+        config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
+        PulsarService pulsar = Mockito.spy(new MockPulsarService(config));
+        pulsar.start();
+
+        // broker and webService is started, but leaderElectionService not ready
+        Mockito.doReturn(null).when(pulsar).getLeaderElectionService();
+        final String tenant = "elect";
+        final String namespace = "ns";
+        PulsarAdmin adminClient = PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080").build();
+        adminClient.clusters().createCluster(clusterName, new ClusterData("http://localhost:8080"));
+        adminClient.tenants().createTenant(tenant, new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(clusterName)));
+        adminClient.namespaces().createNamespace(tenant + "/" + namespace, 16);
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl("pulsar://localhost:6650")
+                .startingBackoffInterval(1, TimeUnit.MILLISECONDS)
+                .maxBackoffInterval(100, TimeUnit.MILLISECONDS)
+                .operationTimeout(1000, TimeUnit.MILLISECONDS)
+                .build();
+        checkLookupException(tenant, namespace, client);
+
+        // broker, webService and leaderElectionService is started, but elect not ready;
+        LeaderElectionService leaderElectionService = Mockito.mock(LeaderElectionService.class);
+        Mockito.when(leaderElectionService.isElected()).thenReturn(false);
+        Mockito.doReturn(leaderElectionService).when(pulsar).getLeaderElectionService();
+        checkLookupException(tenant, namespace, client);
+
+        // broker, webService and leaderElectionService is started, and elect is done;
+        Mockito.when(leaderElectionService.isElected()).thenReturn(true);
+        Mockito.when(leaderElectionService.isLeader()).thenReturn(true);
+        Mockito.when(leaderElectionService.getCurrentLeader()).thenReturn(new LeaderBroker("http://localhost:8080"));
+
+        Producer<byte[]> producer = client.newProducer()
+                .topic("persistent://" + tenant + "/" + namespace + "/1p")
+                .create();
+        producer.getTopic();
+
+    }
+
+    private void checkLookupException(String tenant, String namespace, PulsarClient client) {
+        try {
+            client.newProducer()
+                    .topic("persistent://" + tenant + "/" + namespace + "/1p")
+                    .create();
+        } catch (PulsarClientException t) {
+            Assert.assertTrue(t instanceof PulsarClientException.LookupException);
+            Assert.assertEquals(t.getMessage(), "java.lang.IllegalStateException: The leader election has not yet been completed!");
+        }
+    }
+
+
+    private static class MockPulsarService extends PulsarService {
+
+        public MockPulsarService(ServiceConfiguration config) {
+            super(config);
+        }
+
+        public MockPulsarService(ServiceConfiguration config, Optional<WorkerService> functionWorkerService, Consumer<Integer> processTerminator) {
+            super(config, functionWorkerService, processTerminator);
+        }
+
+        @Override
+        protected void startLeaderElectionService() {
+            // mock
+        }
+    }
+
+}