You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/07/28 06:45:11 UTC
[pulsar] branch branch-2.7 updated: [Branch-2.7] Fixed deadlock on metadata cache missing while doing checkReplication (#12484)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 32fe2288544 [Branch-2.7] Fixed deadlock on metadata cache missing while doing checkReplication (#12484)
32fe2288544 is described below
commit 32fe228854464504d18de240f719b583cf262042
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Jul 27 23:45:03 2022 -0700
[Branch-2.7] Fixed deadlock on metadata cache missing while doing checkReplication (#12484)
---
.../pulsar/broker/service/BrokerService.java | 210 +++++++++++----------
.../broker/service/persistent/PersistentTopic.java | 109 ++++++-----
2 files changed, 172 insertions(+), 147 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 29d1001e4bd..40fa540b9c0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1190,119 +1190,139 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
Optional<Policies> policies = Optional.empty();
Optional<LocalPolicies> localPolicies = Optional.empty();
- PersistencePolicies persistencePolicies = null;
- RetentionPolicies retentionPolicies = null;
- OffloadPolicies topicLevelOffloadPolicies = null;
+ PersistencePolicies tmpPersistencePolicies = null;
+ RetentionPolicies tmpRetentionPolicies = null;
+ OffloadPolicies tmpTopicLevelOffloadPolicies = null;
if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
try {
TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
if (topicPolicies != null) {
- persistencePolicies = topicPolicies.getPersistence();
- retentionPolicies = topicPolicies.getRetentionPolicies();
- topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
+ tmpPersistencePolicies = topicPolicies.getPersistence();
+ tmpRetentionPolicies = topicPolicies.getRetentionPolicies();
+ tmpTopicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
}
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies have not been initialized yet.", topicName);
}
}
- try {
- policies = pulsar
- .getConfigurationCache().policiesCache().get(AdminResource.path(POLICIES,
- namespace.toString()));
- String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
- localPolicies = pulsar().getLocalZkCacheService().policiesCache().get(path);
- } catch (Throwable t) {
- // Ignoring since if we don't have policies, we fallback on the default
- log.warn("Got exception when reading persistence policy for {}: {}", topicName, t.getMessage(), t);
- future.completeExceptionally(t);
- return;
- }
-
- if (persistencePolicies == null) {
- persistencePolicies = policies.map(p -> p.persistence).orElseGet(
- () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
- serviceConfig.getManagedLedgerDefaultWriteQuorum(),
- serviceConfig.getManagedLedgerDefaultAckQuorum(),
- serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
- }
+ final PersistencePolicies finalPersistencePolicies = tmpPersistencePolicies;
+ final RetentionPolicies finalRetentionPolicies = tmpRetentionPolicies;
+ final OffloadPolicies finalTopicLevelOffloadPolicies = tmpTopicLevelOffloadPolicies;
+
+
+ CompletableFuture<Optional<Policies>> policiesFuture = pulsar
+ .getConfigurationCache().policiesCache().getAsync(AdminResource.path(POLICIES,
+ namespace.toString()));
+ String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
+ CompletableFuture<Optional<LocalPolicies>> localPoliciesFuture =
+ pulsar().getLocalZkCacheService().policiesCache().getAsync(path);
+
+ policiesFuture.thenCombine(localPoliciesFuture, (optPolicies, optLocalPolicies) -> {
+ PersistencePolicies persistencePolicies = finalPersistencePolicies;
+ RetentionPolicies retentionPolicies = finalRetentionPolicies;
+ OffloadPolicies topicLevelOffloadPolicies = finalTopicLevelOffloadPolicies;
+
+ if (persistencePolicies == null) {
+ persistencePolicies = policies.map(p -> p.persistence).orElseGet(
+ () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+ serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+ serviceConfig.getManagedLedgerDefaultAckQuorum(),
+ serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+ }
- if (retentionPolicies == null) {
- retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
- () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
- serviceConfig.getDefaultRetentionSizeInMB())
- );
- }
+ if (retentionPolicies == null) {
+ retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
+ () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+ serviceConfig.getDefaultRetentionSizeInMB())
+ );
+ }
- ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
- managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
- managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
- managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
- if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+ managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+ managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+ if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
+ managedLedgerConfig
+ .setBookKeeperEnsemblePlacementPolicyClassName(
+ ZkIsolatedBookieEnsemblePlacementPolicy.class);
+ Map<String, Object> properties = Maps.newHashMap();
+ properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+ localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
+ properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+ localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
+ managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ }
+ managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+ managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+ managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+ managedLedgerConfig.setMaxUnackedRangesToPersist(
+ serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+ managedLedgerConfig.setMaxUnackedRangesToPersistInZk(
+ serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
+ managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+ managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+ managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+ managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+ managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+ serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+ managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+ managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+ managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+ managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+ serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+ managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+ managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
managedLedgerConfig
- .setBookKeeperEnsemblePlacementPolicyClassName(ZkIsolatedBookieEnsemblePlacementPolicy.class);
- Map<String, Object> properties = Maps.newHashMap();
- properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
- localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
- properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
- localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
- managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- }
- managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
- managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
- managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
-
- managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
- managedLedgerConfig.setMaxUnackedRangesToPersistInZk(serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
- managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
- managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
- managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
- managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
-
- managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
- serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
- managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
- managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
- managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
- managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
- serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
- managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
- managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
- managedLedgerConfig
- .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
-
- managedLedgerConfig.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
- managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
- managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
- managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
- managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
-
- OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
- OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration(
- topicLevelOffloadPolicies,
- OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
- getPulsar().getConfig().getProperties());
- if (topicLevelOffloadPolicies != null) {
- try {
- LedgerOffloader topicLevelLedgerOffLoader = pulsar().createManagedLedgerOffloader(offloadPolicies);
- managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
- } catch (PulsarServerException e) {
- future.completeExceptionally(e);
- return;
+ .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+ managedLedgerConfig.setLedgerRolloverTimeout(
+ serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+ managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
+ managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+ managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+ managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+
+ OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
+ OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration(
+ topicLevelOffloadPolicies,
+ OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
+ getPulsar().getConfig().getProperties());
+ if (topicLevelOffloadPolicies != null) {
+ try {
+ LedgerOffloader topicLevelLedgerOffLoader =
+ pulsar().createManagedLedgerOffloader(offloadPolicies);
+ managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+ } catch (PulsarServerException e) {
+ future.completeExceptionally(e);
+ return null;
+ }
+ } else {
+ //If the topic level policy is null, use the namespace level
+ managedLedgerConfig.setLedgerOffloader(
+ pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
}
- } else {
- //If the topic level policy is null, use the namespace level
- managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
- }
- managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
- managedLedgerConfig.setNewEntriesCheckDelayInMillis(serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+ managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
+ serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+ managedLedgerConfig.setNewEntriesCheckDelayInMillis(
+ serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+
+
+ future.complete(managedLedgerConfig);
+ return null;
+ }).exceptionally(ex -> {
+ log.warn("Got exception when reading persistence policy for {}: {}", topicName, ex.getMessage(), ex);
+ future.completeExceptionally(ex);
+ return null;
+ });
- future.complete(managedLedgerConfig);
}, (exception) -> future.completeExceptionally(exception)));
return future;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 67f17d4a4c3..c2fd32a44fd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -157,6 +157,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
@SuppressWarnings("unused")
private volatile long usageCount = 0;
+
static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
@@ -1155,68 +1156,72 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
log.debug("[{}] Checking replication status", name);
}
- Policies policies = null;
- try {
- policies = brokerService.pulsar().getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES, name.getNamespace()))
- .orElseThrow(() -> new KeeperException.NoNodeException());
- } catch (Exception e) {
- CompletableFuture<Void> future = new CompletableFuture<>();
- future.completeExceptionally(new ServerMetadataException(e));
- return future;
- }
- //Ignore current broker's config for messageTTL for replication.
- final int newMessageTTLinSeconds;
- try {
- newMessageTTLinSeconds = getMessageTTL();
- } catch (Exception e) {
- return FutureUtil.failedFuture(new ServerMetadataException(e));
- }
+ return brokerService.pulsar().getConfigurationCache().policiesCache()
+ .getAsync(AdminResource.path(POLICIES, name.getNamespace()))
+ .thenCompose(optPolicies -> {
+ if (!optPolicies.isPresent()) {
+ return FutureUtil.failedFuture(
+ new ServerMetadataException("Namespace not found: " + name.getNamespace()));
+ }
- Set<String> configuredClusters;
- if (policies.replication_clusters != null) {
- configuredClusters = Sets.newTreeSet(policies.replication_clusters);
- } else {
- configuredClusters = Collections.emptySet();
- }
+ Policies policies = optPolicies.get();
- String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
+ //Ignore current broker's config for messageTTL for replication.
+ final int newMessageTTLinSeconds;
+ try {
+ newMessageTTLinSeconds = getMessageTTL();
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(new ServerMetadataException(e));
+ }
- // if local cluster is removed from global namespace cluster-list : then delete topic forcefully because pulsar
- // doesn't serve global topic without local repl-cluster configured.
- if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
- log.info("Deleting topic [{}] because local cluster is not part of global namespace repl list {}",
- topic, configuredClusters);
- return deleteForcefully();
- }
+ Set<String> configuredClusters;
+ if (policies.replication_clusters != null) {
+ configuredClusters = Sets.newTreeSet(policies.replication_clusters);
+ } else {
+ configuredClusters = Collections.emptySet();
+ }
- List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
+
+ // if local cluster is removed from global namespace cluster-list : then delete topic forcefully
+ // because pulsar
+ // doesn't serve global topic without local repl-cluster configured.
+ if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
+ log.info(
+ "Deleting topic [{}] because local cluster is not part of global namespace repl list "
+ + "{}",
+ topic, configuredClusters);
+ return deleteForcefully();
+ }
- // Check for missing replicators
- for (String cluster : configuredClusters) {
- if (cluster.equals(localCluster)) {
- continue;
- }
+ List<CompletableFuture<Void>> futures = Lists.newArrayList();
- if (!replicators.containsKey(cluster)) {
- futures.add(startReplicator(cluster));
- }
- }
+ // Check for missing replicators
+ for (String cluster : configuredClusters) {
+ if (cluster.equals(localCluster)) {
+ continue;
+ }
- // Check for replicators to be stopped
- replicators.forEach((cluster, replicator) -> {
- // Update message TTL
- ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
+ if (!replicators.containsKey(cluster)) {
+ futures.add(startReplicator(cluster));
+ }
+ }
- if (!cluster.equals(localCluster)) {
- if (!configuredClusters.contains(cluster)) {
- futures.add(removeReplicator(cluster));
- }
- }
+ // Check for replicators to be stopped
+ replicators.forEach((cluster, replicator) -> {
+ // Update message TTL
+ ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
- });
+ if (!cluster.equals(localCluster)) {
+ if (!configuredClusters.contains(cluster)) {
+ futures.add(removeReplicator(cluster));
+ }
+ }
- return FutureUtil.waitForAll(futures);
+ });
+
+ return FutureUtil.waitForAll(futures);
+ });
}
@Override