You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/07/28 06:45:11 UTC

[pulsar] branch branch-2.7 updated: [Branch-2.7] Fixed deadlock on metadata cache missing while doing checkReplication (#12484)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 32fe2288544 [Branch-2.7] Fixed deadlock on metadata cache missing while doing checkReplication (#12484)
32fe2288544 is described below

commit 32fe228854464504d18de240f719b583cf262042
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Jul 27 23:45:03 2022 -0700

    [Branch-2.7] Fixed deadlock on metadata cache missing while doing checkReplication (#12484)
---
 .../pulsar/broker/service/BrokerService.java       | 210 +++++++++++----------
 .../broker/service/persistent/PersistentTopic.java | 109 ++++++-----
 2 files changed, 172 insertions(+), 147 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 29d1001e4bd..40fa540b9c0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1190,119 +1190,139 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             Optional<Policies> policies = Optional.empty();
             Optional<LocalPolicies> localPolicies = Optional.empty();
 
-            PersistencePolicies persistencePolicies = null;
-            RetentionPolicies retentionPolicies = null;
-            OffloadPolicies topicLevelOffloadPolicies = null;
+            PersistencePolicies tmpPersistencePolicies = null;
+            RetentionPolicies tmpRetentionPolicies = null;
+            OffloadPolicies tmpTopicLevelOffloadPolicies = null;
 
             if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
                 try {
                     TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
                     if (topicPolicies != null) {
-                        persistencePolicies = topicPolicies.getPersistence();
-                        retentionPolicies = topicPolicies.getRetentionPolicies();
-                        topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
+                        tmpPersistencePolicies = topicPolicies.getPersistence();
+                        tmpRetentionPolicies = topicPolicies.getRetentionPolicies();
+                        tmpTopicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
                     }
                 } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
                     log.debug("Topic {} policies have not been initialized yet.", topicName);
                 }
             }
 
-            try {
-                policies = pulsar
-                        .getConfigurationCache().policiesCache().get(AdminResource.path(POLICIES,
-                                namespace.toString()));
-                String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
-                localPolicies = pulsar().getLocalZkCacheService().policiesCache().get(path);
-            } catch (Throwable t) {
-                // Ignoring since if we don't have policies, we fallback on the default
-                log.warn("Got exception when reading persistence policy for {}: {}", topicName, t.getMessage(), t);
-                future.completeExceptionally(t);
-                return;
-            }
-
-            if (persistencePolicies == null) {
-                persistencePolicies = policies.map(p -> p.persistence).orElseGet(
-                        () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
-                                serviceConfig.getManagedLedgerDefaultWriteQuorum(),
-                                serviceConfig.getManagedLedgerDefaultAckQuorum(),
-                                serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
-            }
+            final PersistencePolicies finalPersistencePolicies = tmpPersistencePolicies;
+            final RetentionPolicies finalRetentionPolicies = tmpRetentionPolicies;
+            final OffloadPolicies finalTopicLevelOffloadPolicies = tmpTopicLevelOffloadPolicies;
+
+
+            CompletableFuture<Optional<Policies>> policiesFuture = pulsar
+                    .getConfigurationCache().policiesCache().getAsync(AdminResource.path(POLICIES,
+                            namespace.toString()));
+            String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
+            CompletableFuture<Optional<LocalPolicies>> localPoliciesFuture =
+                    pulsar().getLocalZkCacheService().policiesCache().getAsync(path);
+
+            policiesFuture.thenCombine(localPoliciesFuture, (optPolicies, optLocalPolicies) -> {
+                PersistencePolicies persistencePolicies = finalPersistencePolicies;
+                RetentionPolicies retentionPolicies = finalRetentionPolicies;
+                OffloadPolicies topicLevelOffloadPolicies = finalTopicLevelOffloadPolicies;
+
+                if (persistencePolicies == null) {
+                    persistencePolicies = policies.map(p -> p.persistence).orElseGet(
+                            () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+                                    serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+                                    serviceConfig.getManagedLedgerDefaultAckQuorum(),
+                                    serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+                }
 
-            if (retentionPolicies == null) {
-                retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
-                        () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
-                                serviceConfig.getDefaultRetentionSizeInMB())
-                );
-            }
+                if (retentionPolicies == null) {
+                    retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
+                            () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+                                    serviceConfig.getDefaultRetentionSizeInMB())
+                    );
+                }
 
-            ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
-            managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
-            managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
-            managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
-            if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
+                ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+                managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+                managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+                managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+                if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
+                    managedLedgerConfig
+                            .setBookKeeperEnsemblePlacementPolicyClassName(
+                                    ZkIsolatedBookieEnsemblePlacementPolicy.class);
+                    Map<String, Object> properties = Maps.newHashMap();
+                    properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+                            localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
+                    properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+                            localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
+                    managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+                }
+                managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+                managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+                managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+                managedLedgerConfig.setMaxUnackedRangesToPersist(
+                        serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+                managedLedgerConfig.setMaxUnackedRangesToPersistInZk(
+                        serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
+                managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+                managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+                        TimeUnit.MINUTES);
+                managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+                        TimeUnit.MINUTES);
+                managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+                managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+                        serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+                managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+                managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+                managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+                managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+                        serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+                managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+                managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
                 managedLedgerConfig
-                        .setBookKeeperEnsemblePlacementPolicyClassName(ZkIsolatedBookieEnsemblePlacementPolicy.class);
-                Map<String, Object> properties = Maps.newHashMap();
-                properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-                        localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
-                properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-                        localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
-                managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-            }
-            managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
-            managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
-            managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
-
-            managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
-            managedLedgerConfig.setMaxUnackedRangesToPersistInZk(serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
-            managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
-            managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
-                    TimeUnit.MINUTES);
-            managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
-                    TimeUnit.MINUTES);
-            managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
-
-            managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
-                    serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
-            managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
-            managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
-            managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
-            managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
-                    serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
-            managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
-            managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
-            managedLedgerConfig
-                    .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
-
-            managedLedgerConfig.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
-            managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
-            managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
-            managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
-            managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
-
-            OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
-            OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration(
-                    topicLevelOffloadPolicies,
-                    OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
-                    getPulsar().getConfig().getProperties());
-            if (topicLevelOffloadPolicies != null) {
-                try {
-                    LedgerOffloader topicLevelLedgerOffLoader = pulsar().createManagedLedgerOffloader(offloadPolicies);
-                    managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
-                } catch (PulsarServerException e) {
-                    future.completeExceptionally(e);
-                    return;
+                        .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+                managedLedgerConfig.setLedgerRolloverTimeout(
+                        serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+                managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
+                managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+                managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+                managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+
+                OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
+                OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration(
+                        topicLevelOffloadPolicies,
+                        OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
+                        getPulsar().getConfig().getProperties());
+                if (topicLevelOffloadPolicies != null) {
+                    try {
+                        LedgerOffloader topicLevelLedgerOffLoader =
+                                pulsar().createManagedLedgerOffloader(offloadPolicies);
+                        managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+                    } catch (PulsarServerException e) {
+                        future.completeExceptionally(e);
+                        return null;
+                    }
+                } else {
+                    //If the topic level policy is null, use the namespace level
+                    managedLedgerConfig.setLedgerOffloader(
+                            pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
                 }
-            } else {
-                //If the topic level policy is null, use the namespace level
-                managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
-            }
 
-            managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
-            managedLedgerConfig.setNewEntriesCheckDelayInMillis(serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+                managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
+                        serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+                managedLedgerConfig.setNewEntriesCheckDelayInMillis(
+                        serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+
+
+                future.complete(managedLedgerConfig);
+                return null;
+            }).exceptionally(ex -> {
+                log.warn("Got exception when reading persistence policy for {}: {}", topicName, ex.getMessage(), ex);
+                future.completeExceptionally(ex);
+                return null;
+            });
 
 
-            future.complete(managedLedgerConfig);
         }, (exception) -> future.completeExceptionally(exception)));
 
         return future;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 67f17d4a4c3..c2fd32a44fd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -157,6 +157,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     @SuppressWarnings("unused")
     private volatile long usageCount = 0;
 
+
     static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
 
     private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
@@ -1155,68 +1156,72 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             log.debug("[{}] Checking replication status", name);
         }
 
-        Policies policies = null;
-        try {
-            policies = brokerService.pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, name.getNamespace()))
-                    .orElseThrow(() -> new KeeperException.NoNodeException());
-        } catch (Exception e) {
-            CompletableFuture<Void> future = new CompletableFuture<>();
-            future.completeExceptionally(new ServerMetadataException(e));
-            return future;
-        }
-        //Ignore current broker's config for messageTTL for replication.
-        final int newMessageTTLinSeconds;
-        try {
-            newMessageTTLinSeconds = getMessageTTL();
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(new ServerMetadataException(e));
-        }
+        return brokerService.pulsar().getConfigurationCache().policiesCache()
+                .getAsync(AdminResource.path(POLICIES, name.getNamespace()))
+                .thenCompose(optPolicies -> {
+                    if (!optPolicies.isPresent()) {
+                        return FutureUtil.failedFuture(
+                                new ServerMetadataException("Namespace not found: " + name.getNamespace()));
+                    }
 
-        Set<String> configuredClusters;
-        if (policies.replication_clusters != null) {
-            configuredClusters = Sets.newTreeSet(policies.replication_clusters);
-        } else {
-            configuredClusters = Collections.emptySet();
-        }
+                    Policies policies = optPolicies.get();
 
-        String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
+                    //Ignore current broker's config for messageTTL for replication.
+                    final int newMessageTTLinSeconds;
+                    try {
+                        newMessageTTLinSeconds = getMessageTTL();
+                    } catch (Exception e) {
+                        return FutureUtil.failedFuture(new ServerMetadataException(e));
+                    }
 
-        // if local cluster is removed from global namespace cluster-list : then delete topic forcefully because pulsar
-        // doesn't serve global topic without local repl-cluster configured.
-        if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
-            log.info("Deleting topic [{}] because local cluster is not part of global namespace repl list {}",
-                    topic, configuredClusters);
-            return deleteForcefully();
-        }
+                    Set<String> configuredClusters;
+                    if (policies.replication_clusters != null) {
+                        configuredClusters = Sets.newTreeSet(policies.replication_clusters);
+                    } else {
+                        configuredClusters = Collections.emptySet();
+                    }
 
-        List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                    String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
+
+                    // if local cluster is removed from global namespace cluster-list : then delete topic forcefully
+                    // because pulsar
+                    // doesn't serve global topic without local repl-cluster configured.
+                    if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
+                        log.info(
+                                "Deleting topic [{}] because local cluster is not part of global namespace repl list "
+                                        + "{}",
+                                topic, configuredClusters);
+                        return deleteForcefully();
+                    }
 
-        // Check for missing replicators
-        for (String cluster : configuredClusters) {
-            if (cluster.equals(localCluster)) {
-                continue;
-            }
+                    List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
-            if (!replicators.containsKey(cluster)) {
-                futures.add(startReplicator(cluster));
-            }
-        }
+                    // Check for missing replicators
+                    for (String cluster : configuredClusters) {
+                        if (cluster.equals(localCluster)) {
+                            continue;
+                        }
 
-        // Check for replicators to be stopped
-        replicators.forEach((cluster, replicator) -> {
-            // Update message TTL
-            ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
+                        if (!replicators.containsKey(cluster)) {
+                            futures.add(startReplicator(cluster));
+                        }
+                    }
 
-            if (!cluster.equals(localCluster)) {
-                if (!configuredClusters.contains(cluster)) {
-                    futures.add(removeReplicator(cluster));
-                }
-            }
+                    // Check for replicators to be stopped
+                    replicators.forEach((cluster, replicator) -> {
+                        // Update message TTL
+                        ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
 
-        });
+                        if (!cluster.equals(localCluster)) {
+                            if (!configuredClusters.contains(cluster)) {
+                                futures.add(removeReplicator(cluster));
+                            }
+                        }
 
-        return FutureUtil.waitForAll(futures);
+                    });
+
+                    return FutureUtil.waitForAll(futures);
+                });
     }
 
     @Override