You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/09/24 09:19:56 UTC

[pulsar] branch master updated: [Fix] Should not cache the owner that does not belong to current server (#8111)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 38429e1  [Fix] Should not cache the owner that does not belong to current server (#8111)
38429e1 is described below

commit 38429e1e3583c72bbaf8f856ca87a3ae418df012
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Sep 24 17:19:39 2020 +0800

    [Fix] Should not cache the owner that does not belong to current server (#8111)
    
    this PR also solve #8071 , we don't add the incorrect bundle data int the server own cache
    
    ### Motivation
    modify the get ownership logic to support getOwnerAsync twice can get the same result.
    
    
    ### Modifications
    
    ```
    org.apache.pulsar.broker.namespace.OwnershipCache;
    
        private CompletableFuture<Optional<Map.Entry<NamespaceEphemeralData, Stat>>> resolveOwnership(String path) {
            return ownershipReadOnlyCache.getWithStatAsync(path).thenApply(optionalOwnerDataWithStat -> {
                if (optionalOwnerDataWithStat.isPresent()) {
                    Map.Entry<NamespaceEphemeralData, Stat> ownerDataWithStat = optionalOwnerDataWithStat.get();
                    Stat stat = ownerDataWithStat.getValue();
                    if (stat.getEphemeralOwner() == localZkCache.getZooKeeper().getSessionId()) {
                        LOG.info("Successfully reestablish ownership of {}", path);
                        OwnedBundle ownedBundle = new OwnedBundle(ServiceUnitZkUtils.suBundleFromPath(path, bundleFactory));
                        if (selfOwnerInfo.getNativeUrl().equals(ownerDataWithStat.getKey().getNativeUrl())) {
                            ownedBundlesCache.put(path, CompletableFuture.completedFuture(ownedBundle));
                        }
                        ownershipReadOnlyCache.invalidate(path);
                        namespaceService.onNamespaceBundleOwned(ownedBundle.getNamespaceBundle());
                    }
                }
                return optionalOwnerDataWithStat;
            });
        }
    
    ```
---
 .../pulsar/broker/namespace/OwnershipCache.java    |   4 +-
 .../OwnerShipCacheForCurrentServerTest.java        |  64 +++++
 .../OwnerShipForCurrentServerTestBase.java         | 272 +++++++++++++++++++++
 3 files changed, 339 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 35dd712..18ba9e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -185,7 +185,9 @@ public class OwnershipCache {
                 if (stat.getEphemeralOwner() == localZkCache.getZooKeeper().getSessionId()) {
                     LOG.info("Successfully reestablish ownership of {}", path);
                     OwnedBundle ownedBundle = new OwnedBundle(ServiceUnitZkUtils.suBundleFromPath(path, bundleFactory));
-                    ownedBundlesCache.put(path, CompletableFuture.completedFuture(ownedBundle));
+                    if (selfOwnerInfo.getNativeUrl().equals(ownerDataWithStat.getKey().getNativeUrl())) {
+                        ownedBundlesCache.put(path, CompletableFuture.completedFuture(ownedBundle));
+                    }
                     ownershipReadOnlyCache.invalidate(path);
                     namespaceService.onNamespaceBundleOwned(ownedBundle.getNamespaceBundle());
                 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java
new file mode 100644
index 0000000..688fdb0
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.namespace;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OwnerShipCacheForCurrentServerTest extends OwnerShipForCurrentServerTestBase {
+
+    private final static String TENANT = "ownership";
+    private final static String NAMESPACE = TENANT + "/ns1";
+    private final static String TOPIC_TEST = NAMESPACE + "/test";
+
+    @BeforeMethod
+    protected void setup() throws Exception {
+        internalSetup();
+        String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
+        String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
+        admin.clusters().createCluster(CLUSTER_NAME, new ClusterData("http://localhost:" + webServicePort));
+        admin.tenants().createTenant(TENANT,
+                new TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
+        admin.namespaces().createNamespace(NAMESPACE);
+        admin.topics().createNonPartitionedTopic(TOPIC_TEST);
+    }
+
+    @AfterMethod
+    protected void cleanup() {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testOwnershipForCurrentServer() throws Exception {
+        NamespaceService[] namespaceServices = new NamespaceService[getPulsarServiceList().size()];
+        for (int i = 0; i < getPulsarServiceList().size(); i++) {
+            namespaceServices[i] = getPulsarServiceList().get(i).getNamespaceService();
+            NamespaceBundle bundle = namespaceServices[i].getBundle(TopicName.get(TOPIC_TEST));
+            Assert.assertEquals(namespaceServices[i].getOwnerAsync(bundle).get().get().getNativeUrl(),
+                    namespaceServices[i].getOwnerAsync(bundle).get().get().getNativeUrl());
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
new file mode 100644
index 0000000..98d4277
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.namespace;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor;
+import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.mockito.Mockito.*;
+
+@Slf4j
+public class OwnerShipForCurrentServerTestBase {
+
+    public final static String CLUSTER_NAME = "test";
+
+    @Setter
+    private int brokerCount = 3;
+
+    private final List<SameThreadOrderedSafeExecutor> orderedExecutorList = new ArrayList<>();
+    @Getter
+    private final List<ServiceConfiguration> serviceConfigurationList = new ArrayList<>();
+    @Getter
+    private final List<PulsarService> pulsarServiceList = new ArrayList<>();
+
+    protected PulsarAdmin admin;
+    protected PulsarClient pulsarClient;
+
+    private MockZooKeeper mockZooKeeper;
+    private ExecutorService bkExecutor;
+    private NonClosableMockBookKeeper mockBookKeeper;
+
+    public void internalSetup() throws Exception {
+        init();
+
+        admin = spy(PulsarAdmin.builder().serviceHttpUrl(pulsarServiceList.get(0).getWebServiceAddress()).build());
+
+        pulsarClient = PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()).build();
+    }
+
+    private void init() throws Exception {
+        mockZooKeeper = createMockZooKeeper();
+
+        bkExecutor = Executors.newSingleThreadExecutor(
+                new ThreadFactoryBuilder().setNameFormat("mock-pulsar-bk")
+                        .setUncaughtExceptionHandler((thread, ex) -> log.info("Uncaught exception", ex))
+                        .build());
+        mockBookKeeper = createMockBookKeeper(mockZooKeeper, bkExecutor);
+        startBroker();
+    }
+
+    protected void startBroker() throws Exception {
+        for (int i = 0; i < brokerCount; i++) {
+            ServiceConfiguration conf = new ServiceConfiguration();
+            conf.setClusterName(CLUSTER_NAME);
+            conf.setAdvertisedAddress("localhost");
+            conf.setManagedLedgerCacheSizeMB(8);
+            conf.setActiveConsumerFailoverDelayTimeMillis(0);
+            conf.setDefaultNumberOfNamespaceBundles(1);
+            conf.setZookeeperServers("localhost:2181");
+            conf.setConfigurationStoreServers("localhost:3181");
+            conf.setAllowAutoTopicCreationType("non-partitioned");
+            conf.setBookkeeperClientExposeStatsToPrometheus(true);
+            conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+
+            conf.setBrokerServicePort(Optional.of(0));
+            conf.setBrokerServicePortTls(Optional.of(0));
+            conf.setAdvertisedAddress("localhost");
+            conf.setWebServicePort(Optional.of(0));
+            conf.setWebServicePortTls(Optional.of(0));
+            serviceConfigurationList.add(conf);
+
+            PulsarService pulsar = spy(new PulsarService(conf));
+
+            setupBrokerMocks(pulsar);
+            pulsar.start();
+            pulsarServiceList.add(pulsar);
+        }
+    }
+
+    protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
+        // Override default providers with mocked ones
+        doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
+        doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
+
+        Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar));
+        doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
+
+        SameThreadOrderedSafeExecutor executor = new SameThreadOrderedSafeExecutor();
+        orderedExecutorList.add(executor);
+        doReturn(executor).when(pulsar).getOrderedExecutor();
+        doReturn(new CounterBrokerInterceptor()).when(pulsar).getBrokerInterceptor();
+
+        doAnswer((invocation) -> spy(invocation.callRealMethod())).when(pulsar).newCompactor();
+    }
+
+    public static MockZooKeeper createMockZooKeeper() throws Exception {
+        MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
+        List<ACL> dummyAclList = new ArrayList<>(0);
+
+        ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000,
+                "".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), dummyAclList, CreateMode.PERSISTENT);
+
+        zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), dummyAclList,
+                CreateMode.PERSISTENT);
+        return zk;
+    }
+
+    public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper zookeeper,
+                                                                                             ExecutorService executor) throws Exception {
+        return spy(new NonClosableMockBookKeeper(zookeeper, executor));
+    }
+
+    // Prevent the MockBookKeeper instance from being closed when the broker is restarted within a test
+    public static class NonClosableMockBookKeeper extends PulsarMockBookKeeper {
+
+        public NonClosableMockBookKeeper(ZooKeeper zk, ExecutorService executor) throws Exception {
+            super(zk, executor);
+        }
+
+        @Override
+        public void close() {
+            // no-op
+        }
+
+        @Override
+        public void shutdown() {
+            // no-op
+        }
+
+        public void reallyShutdown() {
+            super.shutdown();
+        }
+    }
+
+    protected ZooKeeperClientFactory mockZooKeeperClientFactory = new ZooKeeperClientFactory() {
+
+        @Override
+        public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
+                                                   int zkSessionTimeoutMillis) {
+            // Always return the same instance (so that we don't loose the mock ZK content on broker restart
+            return CompletableFuture.completedFuture(mockZooKeeper);
+        }
+    };
+
+    private final BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() {
+
+        @Override
+        public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+                                 Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+                                 Map<String, Object> properties) {
+            // Always return the same instance (so that we don't loose the mock BK content on broker restart
+            return mockBookKeeper;
+        }
+
+        @Override
+        public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+                                 Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+                                 Map<String, Object> properties, StatsLogger statsLogger) {
+            // Always return the same instance (so that we don't loose the mock BK content on broker restart
+            return mockBookKeeper;
+        }
+
+        @Override
+        public void close() {
+            // no-op
+        }
+    };
+
+    protected final void internalCleanup() {
+        try {
+            // if init fails, some of these could be null, and if so would throw
+            // an NPE in shutdown, obscuring the real error
+            if (admin != null) {
+                admin.close();
+                admin = null;
+            }
+            if (pulsarClient != null) {
+                pulsarClient.shutdown();
+                pulsarClient = null;
+            }
+            if (pulsarServiceList.size() > 0) {
+                for (PulsarService pulsarService : pulsarServiceList) {
+                    pulsarService.close();
+                }
+                pulsarServiceList.clear();
+            }
+            if (serviceConfigurationList.size() > 0) {
+                serviceConfigurationList.clear();
+            }
+            if (mockBookKeeper != null) {
+                mockBookKeeper.reallyShutdown();
+            }
+            if (mockZooKeeper != null) {
+                mockZooKeeper.shutdown();
+            }
+            if (orderedExecutorList.size() > 0) {
+                for (int i = 0; i < orderedExecutorList.size(); i++) {
+                    SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor = orderedExecutorList.get(i);
+                    if(sameThreadOrderedSafeExecutor != null) {
+                        try {
+                            sameThreadOrderedSafeExecutor.shutdownNow();
+                            sameThreadOrderedSafeExecutor.awaitTermination(5, TimeUnit.SECONDS);
+                        } catch (InterruptedException ex) {
+                            log.error("sameThreadOrderedSafeExecutor shutdown had error", ex);
+                            Thread.currentThread().interrupt();
+                        }
+                        orderedExecutorList.set(i, null);
+                    }
+                }
+            }
+            if(bkExecutor != null) {
+                try {
+                    bkExecutor.shutdownNow();
+                    bkExecutor.awaitTermination(5, TimeUnit.SECONDS);
+                } catch (InterruptedException ex) {
+                    log.error("bkExecutor shutdown had error", ex);
+                    Thread.currentThread().interrupt();
+                }
+                bkExecutor = null;
+            }
+        } catch (Exception e) {
+            log.warn("Failed to clean up mocked pulsar service:", e);
+        }
+    }
+
+}