You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/09/24 09:19:56 UTC
[pulsar] branch master updated: [Fix] Should not cache the owner
that does not belong to current server (#8111)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 38429e1 [Fix] Should not cache the owner that does not belong to current server (#8111)
38429e1 is described below
commit 38429e1e3583c72bbaf8f856ca87a3ae418df012
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Thu Sep 24 17:19:39 2020 +0800
[Fix] Should not cache the owner that does not belong to current server (#8111)
this PR also solve #8071 , we don't add the incorrect bundle data int the server own cache
### Motivation
modify the get ownership logic to support getOwnerAsync twice can get the same result.
### Modifications
```
org.apache.pulsar.broker.namespace.OwnershipCache;
private CompletableFuture<Optional<Map.Entry<NamespaceEphemeralData, Stat>>> resolveOwnership(String path) {
return ownershipReadOnlyCache.getWithStatAsync(path).thenApply(optionalOwnerDataWithStat -> {
if (optionalOwnerDataWithStat.isPresent()) {
Map.Entry<NamespaceEphemeralData, Stat> ownerDataWithStat = optionalOwnerDataWithStat.get();
Stat stat = ownerDataWithStat.getValue();
if (stat.getEphemeralOwner() == localZkCache.getZooKeeper().getSessionId()) {
LOG.info("Successfully reestablish ownership of {}", path);
OwnedBundle ownedBundle = new OwnedBundle(ServiceUnitZkUtils.suBundleFromPath(path, bundleFactory));
if (selfOwnerInfo.getNativeUrl().equals(ownerDataWithStat.getKey().getNativeUrl())) {
ownedBundlesCache.put(path, CompletableFuture.completedFuture(ownedBundle));
}
ownershipReadOnlyCache.invalidate(path);
namespaceService.onNamespaceBundleOwned(ownedBundle.getNamespaceBundle());
}
}
return optionalOwnerDataWithStat;
});
}
```
---
.../pulsar/broker/namespace/OwnershipCache.java | 4 +-
.../OwnerShipCacheForCurrentServerTest.java | 64 +++++
.../OwnerShipForCurrentServerTestBase.java | 272 +++++++++++++++++++++
3 files changed, 339 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 35dd712..18ba9e2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -185,7 +185,9 @@ public class OwnershipCache {
if (stat.getEphemeralOwner() == localZkCache.getZooKeeper().getSessionId()) {
LOG.info("Successfully reestablish ownership of {}", path);
OwnedBundle ownedBundle = new OwnedBundle(ServiceUnitZkUtils.suBundleFromPath(path, bundleFactory));
- ownedBundlesCache.put(path, CompletableFuture.completedFuture(ownedBundle));
+ if (selfOwnerInfo.getNativeUrl().equals(ownerDataWithStat.getKey().getNativeUrl())) {
+ ownedBundlesCache.put(path, CompletableFuture.completedFuture(ownedBundle));
+ }
ownershipReadOnlyCache.invalidate(path);
namespaceService.onNamespaceBundleOwned(ownedBundle.getNamespaceBundle());
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java
new file mode 100644
index 0000000..688fdb0
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.namespace;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.common.naming.NamespaceBundle;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OwnerShipCacheForCurrentServerTest extends OwnerShipForCurrentServerTestBase {
+
+ private final static String TENANT = "ownership";
+ private final static String NAMESPACE = TENANT + "/ns1";
+ private final static String TOPIC_TEST = NAMESPACE + "/test";
+
+ @BeforeMethod
+ protected void setup() throws Exception {
+ internalSetup();
+ String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":");
+ String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1];
+ admin.clusters().createCluster(CLUSTER_NAME, new ClusterData("http://localhost:" + webServicePort));
+ admin.tenants().createTenant(TENANT,
+ new TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
+ admin.namespaces().createNamespace(NAMESPACE);
+ admin.topics().createNonPartitionedTopic(TOPIC_TEST);
+ }
+
+ @AfterMethod
+ protected void cleanup() {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testOwnershipForCurrentServer() throws Exception {
+ NamespaceService[] namespaceServices = new NamespaceService[getPulsarServiceList().size()];
+ for (int i = 0; i < getPulsarServiceList().size(); i++) {
+ namespaceServices[i] = getPulsarServiceList().get(i).getNamespaceService();
+ NamespaceBundle bundle = namespaceServices[i].getBundle(TopicName.get(TOPIC_TEST));
+ Assert.assertEquals(namespaceServices[i].getOwnerAsync(bundle).get().get().getNativeUrl(),
+ namespaceServices[i].getOwnerAsync(bundle).get().get().getNativeUrl());
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
new file mode 100644
index 0000000..98d4277
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipForCurrentServerTestBase.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.namespace;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
+import org.apache.bookkeeper.client.PulsarMockBookKeeper;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.broker.BookKeeperClientFactory;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.SameThreadOrderedSafeExecutor;
+import org.apache.pulsar.broker.intercept.CounterBrokerInterceptor;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.MockZooKeeper;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import static org.mockito.Mockito.*;
+
+@Slf4j
+public class OwnerShipForCurrentServerTestBase {
+
+ public final static String CLUSTER_NAME = "test";
+
+ @Setter
+ private int brokerCount = 3;
+
+ private final List<SameThreadOrderedSafeExecutor> orderedExecutorList = new ArrayList<>();
+ @Getter
+ private final List<ServiceConfiguration> serviceConfigurationList = new ArrayList<>();
+ @Getter
+ private final List<PulsarService> pulsarServiceList = new ArrayList<>();
+
+ protected PulsarAdmin admin;
+ protected PulsarClient pulsarClient;
+
+ private MockZooKeeper mockZooKeeper;
+ private ExecutorService bkExecutor;
+ private NonClosableMockBookKeeper mockBookKeeper;
+
+ public void internalSetup() throws Exception {
+ init();
+
+ admin = spy(PulsarAdmin.builder().serviceHttpUrl(pulsarServiceList.get(0).getWebServiceAddress()).build());
+
+ pulsarClient = PulsarClient.builder().serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()).build();
+ }
+
+ private void init() throws Exception {
+ mockZooKeeper = createMockZooKeeper();
+
+ bkExecutor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat("mock-pulsar-bk")
+ .setUncaughtExceptionHandler((thread, ex) -> log.info("Uncaught exception", ex))
+ .build());
+ mockBookKeeper = createMockBookKeeper(mockZooKeeper, bkExecutor);
+ startBroker();
+ }
+
+ protected void startBroker() throws Exception {
+ for (int i = 0; i < brokerCount; i++) {
+ ServiceConfiguration conf = new ServiceConfiguration();
+ conf.setClusterName(CLUSTER_NAME);
+ conf.setAdvertisedAddress("localhost");
+ conf.setManagedLedgerCacheSizeMB(8);
+ conf.setActiveConsumerFailoverDelayTimeMillis(0);
+ conf.setDefaultNumberOfNamespaceBundles(1);
+ conf.setZookeeperServers("localhost:2181");
+ conf.setConfigurationStoreServers("localhost:3181");
+ conf.setAllowAutoTopicCreationType("non-partitioned");
+ conf.setBookkeeperClientExposeStatsToPrometheus(true);
+ conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
+
+ conf.setBrokerServicePort(Optional.of(0));
+ conf.setBrokerServicePortTls(Optional.of(0));
+ conf.setAdvertisedAddress("localhost");
+ conf.setWebServicePort(Optional.of(0));
+ conf.setWebServicePortTls(Optional.of(0));
+ serviceConfigurationList.add(conf);
+
+ PulsarService pulsar = spy(new PulsarService(conf));
+
+ setupBrokerMocks(pulsar);
+ pulsar.start();
+ pulsarServiceList.add(pulsar);
+ }
+ }
+
+ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
+ // Override default providers with mocked ones
+ doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
+ doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
+
+ Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar));
+ doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
+
+ SameThreadOrderedSafeExecutor executor = new SameThreadOrderedSafeExecutor();
+ orderedExecutorList.add(executor);
+ doReturn(executor).when(pulsar).getOrderedExecutor();
+ doReturn(new CounterBrokerInterceptor()).when(pulsar).getBrokerInterceptor();
+
+ doAnswer((invocation) -> spy(invocation.callRealMethod())).when(pulsar).newCompactor();
+ }
+
+ public static MockZooKeeper createMockZooKeeper() throws Exception {
+ MockZooKeeper zk = MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService());
+ List<ACL> dummyAclList = new ArrayList<>(0);
+
+ ZkUtils.createFullPathOptimistic(zk, "/ledgers/available/192.168.1.1:" + 5000,
+ "".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), dummyAclList, CreateMode.PERSISTENT);
+
+ zk.create("/ledgers/LAYOUT", "1\nflat:1".getBytes(ZookeeperClientFactoryImpl.ENCODING_SCHEME), dummyAclList,
+ CreateMode.PERSISTENT);
+ return zk;
+ }
+
+ public static NonClosableMockBookKeeper createMockBookKeeper(ZooKeeper zookeeper,
+ ExecutorService executor) throws Exception {
+ return spy(new NonClosableMockBookKeeper(zookeeper, executor));
+ }
+
+ // Prevent the MockBookKeeper instance from being closed when the broker is restarted within a test
+ public static class NonClosableMockBookKeeper extends PulsarMockBookKeeper {
+
+ public NonClosableMockBookKeeper(ZooKeeper zk, ExecutorService executor) throws Exception {
+ super(zk, executor);
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+
+ @Override
+ public void shutdown() {
+ // no-op
+ }
+
+ public void reallyShutdown() {
+ super.shutdown();
+ }
+ }
+
+ protected ZooKeeperClientFactory mockZooKeeperClientFactory = new ZooKeeperClientFactory() {
+
+ @Override
+ public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessionType,
+ int zkSessionTimeoutMillis) {
+ // Always return the same instance (so that we don't loose the mock ZK content on broker restart
+ return CompletableFuture.completedFuture(mockZooKeeper);
+ }
+ };
+
+ private final BookKeeperClientFactory mockBookKeeperClientFactory = new BookKeeperClientFactory() {
+
+ @Override
+ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+ Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+ Map<String, Object> properties) {
+ // Always return the same instance (so that we don't loose the mock BK content on broker restart
+ return mockBookKeeper;
+ }
+
+ @Override
+ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient,
+ Optional<Class<? extends EnsemblePlacementPolicy>> ensemblePlacementPolicyClass,
+ Map<String, Object> properties, StatsLogger statsLogger) {
+ // Always return the same instance (so that we don't loose the mock BK content on broker restart
+ return mockBookKeeper;
+ }
+
+ @Override
+ public void close() {
+ // no-op
+ }
+ };
+
+ protected final void internalCleanup() {
+ try {
+ // if init fails, some of these could be null, and if so would throw
+ // an NPE in shutdown, obscuring the real error
+ if (admin != null) {
+ admin.close();
+ admin = null;
+ }
+ if (pulsarClient != null) {
+ pulsarClient.shutdown();
+ pulsarClient = null;
+ }
+ if (pulsarServiceList.size() > 0) {
+ for (PulsarService pulsarService : pulsarServiceList) {
+ pulsarService.close();
+ }
+ pulsarServiceList.clear();
+ }
+ if (serviceConfigurationList.size() > 0) {
+ serviceConfigurationList.clear();
+ }
+ if (mockBookKeeper != null) {
+ mockBookKeeper.reallyShutdown();
+ }
+ if (mockZooKeeper != null) {
+ mockZooKeeper.shutdown();
+ }
+ if (orderedExecutorList.size() > 0) {
+ for (int i = 0; i < orderedExecutorList.size(); i++) {
+ SameThreadOrderedSafeExecutor sameThreadOrderedSafeExecutor = orderedExecutorList.get(i);
+ if(sameThreadOrderedSafeExecutor != null) {
+ try {
+ sameThreadOrderedSafeExecutor.shutdownNow();
+ sameThreadOrderedSafeExecutor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException ex) {
+ log.error("sameThreadOrderedSafeExecutor shutdown had error", ex);
+ Thread.currentThread().interrupt();
+ }
+ orderedExecutorList.set(i, null);
+ }
+ }
+ }
+ if(bkExecutor != null) {
+ try {
+ bkExecutor.shutdownNow();
+ bkExecutor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException ex) {
+ log.error("bkExecutor shutdown had error", ex);
+ Thread.currentThread().interrupt();
+ }
+ bkExecutor = null;
+ }
+ } catch (Exception e) {
+ log.warn("Failed to clean up mocked pulsar service:", e);
+ }
+ }
+
+}