You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/07/31 06:31:43 UTC

[pulsar] branch branch-2.7 updated: [Branch 2.7] Revert some PRs to fix CI for branch 2.7 (#16882)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new d966c1af89f [Branch 2.7] Revert some PRs to fix CI for branch 2.7 (#16882)
d966c1af89f is described below

commit d966c1af89f7447470c0cf853f28f466dfdaf9da
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Sun Jul 31 14:31:36 2022 +0800

    [Branch 2.7] Revert some PRs to fix CI for branch 2.7 (#16882)
    
    * Revert "[fix][proxy] Fix client service url (#16834)"
    
    This reverts commit 10b4e99a815d1c36660c315171f71d1f866a4f74.
    
    * Revert "[Build] Use grpc-bom to align grpc library versions (#15234)"
    
    This reverts commit 99c93d2f5fe2f72e185d46813383c7f7c984022d.
    
    * Revert "upgrade aircompressor to 0.20 (#11790)"
    
    This reverts commit 5ad16b623fdf3a2157577694ff4bc7fd06450a9b.
    
    * Revert "[Branch-2.7] Fixed deadlock on metadata cache missing while doing checkReplication (#12484)"
    
    This reverts commit 32fe228854464504d18de240f719b583cf262042.
    
    * Revert changes of PersistentTopic#getMessageTTL in #12339.
    
    Co-authored-by: JiangHaiting <ja...@apache.org>
---
 distribution/server/src/assemble/LICENSE.bin.txt   |   2 +-
 pom.xml                                            |  32 ++--
 .../pulsar/broker/service/BrokerService.java       | 210 ++++++++++-----------
 .../broker/service/persistent/PersistentTopic.java | 115 ++++++-----
 .../pulsar/proxy/server/ProxyConnection.java       |   4 +-
 .../pulsar/proxy/server/ProxyConnectionTest.java   |  24 ---
 pulsar-sql/presto-distribution/LICENSE             |   2 +-
 7 files changed, 173 insertions(+), 216 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 936528ad8b7..41052de646e 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -425,7 +425,7 @@ The Apache Software License, Version 2.0
     - org.apache.httpcomponents-httpclient-4.5.5.jar
     - org.apache.httpcomponents-httpcore-4.4.9.jar
  * AirCompressor
-    - io.airlift-aircompressor-0.20.jar
+    - io.airlift-aircompressor-0.16.jar
  * AsyncHttpClient
     - org.asynchttpclient-async-http-client-2.12.1.jar
     - org.asynchttpclient-async-http-client-netty-utils-2.12.1.jar
diff --git a/pom.xml b/pom.xml
index 40d045ec0dc..718ef2d81df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,7 +157,7 @@ flexible messaging model and an intuitive client API.</description>
     <prometheus-jmx.version>0.14.0</prometheus-jmx.version>
     <confluent.version>5.3.2</confluent.version>
     <kafka-avro-convert-jackson.version>1.9.13</kafka-avro-convert-jackson.version>
-    <aircompressor.version>0.20</aircompressor.version>
+    <aircompressor.version>0.16</aircompressor.version>
     <asynchttpclient.version>2.12.1</asynchttpclient.version>
     <jcommander.version>1.78</jcommander.version>
     <commons-lang3.version>3.6</commons-lang3.version>
@@ -598,7 +598,7 @@ flexible messaging model and an intuitive client API.</description>
         <artifactId>jna</artifactId>
         <version>${jna.version}</version>
       </dependency>
-
+      
       <dependency>
          <groupId>com.github.docker-java</groupId>
          <artifactId>docker-java-core</artifactId>
@@ -787,14 +787,6 @@ flexible messaging model and an intuitive client API.</description>
         <version>${typetools.version}</version>
       </dependency>
 
-      <dependency>
-        <groupId>io.grpc</groupId>
-        <artifactId>grpc-bom</artifactId>
-        <version>${grpc.version}</version>
-        <type>pom</type>
-        <scope>import</scope>
-      </dependency>
-
       <dependency>
         <groupId>io.grpc</groupId>
         <artifactId>grpc-all</artifactId>
@@ -806,7 +798,25 @@ flexible messaging model and an intuitive client API.</description>
           </exclusion>
         </exclusions>
       </dependency>
-      
+
+      <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-core</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-stub</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-protobuf-lite</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+
       <dependency>
         <groupId>com.google.protobuf</groupId>
         <artifactId>protobuf-bom</artifactId>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 40fa540b9c0..29d1001e4bd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1190,139 +1190,119 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
             Optional<Policies> policies = Optional.empty();
             Optional<LocalPolicies> localPolicies = Optional.empty();
 
-            PersistencePolicies tmpPersistencePolicies = null;
-            RetentionPolicies tmpRetentionPolicies = null;
-            OffloadPolicies tmpTopicLevelOffloadPolicies = null;
+            PersistencePolicies persistencePolicies = null;
+            RetentionPolicies retentionPolicies = null;
+            OffloadPolicies topicLevelOffloadPolicies = null;
 
             if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
                 try {
                     TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
                     if (topicPolicies != null) {
-                        tmpPersistencePolicies = topicPolicies.getPersistence();
-                        tmpRetentionPolicies = topicPolicies.getRetentionPolicies();
-                        tmpTopicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
+                        persistencePolicies = topicPolicies.getPersistence();
+                        retentionPolicies = topicPolicies.getRetentionPolicies();
+                        topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
                     }
                 } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
                     log.debug("Topic {} policies have not been initialized yet.", topicName);
                 }
             }
 
-            final PersistencePolicies finalPersistencePolicies = tmpPersistencePolicies;
-            final RetentionPolicies finalRetentionPolicies = tmpRetentionPolicies;
-            final OffloadPolicies finalTopicLevelOffloadPolicies = tmpTopicLevelOffloadPolicies;
-
-
-            CompletableFuture<Optional<Policies>> policiesFuture = pulsar
-                    .getConfigurationCache().policiesCache().getAsync(AdminResource.path(POLICIES,
-                            namespace.toString()));
-            String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
-            CompletableFuture<Optional<LocalPolicies>> localPoliciesFuture =
-                    pulsar().getLocalZkCacheService().policiesCache().getAsync(path);
-
-            policiesFuture.thenCombine(localPoliciesFuture, (optPolicies, optLocalPolicies) -> {
-                PersistencePolicies persistencePolicies = finalPersistencePolicies;
-                RetentionPolicies retentionPolicies = finalRetentionPolicies;
-                OffloadPolicies topicLevelOffloadPolicies = finalTopicLevelOffloadPolicies;
-
-                if (persistencePolicies == null) {
-                    persistencePolicies = policies.map(p -> p.persistence).orElseGet(
-                            () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
-                                    serviceConfig.getManagedLedgerDefaultWriteQuorum(),
-                                    serviceConfig.getManagedLedgerDefaultAckQuorum(),
-                                    serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
-                }
+            try {
+                policies = pulsar
+                        .getConfigurationCache().policiesCache().get(AdminResource.path(POLICIES,
+                                namespace.toString()));
+                String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
+                localPolicies = pulsar().getLocalZkCacheService().policiesCache().get(path);
+            } catch (Throwable t) {
+                // Ignoring since if we don't have policies, we fallback on the default
+                log.warn("Got exception when reading persistence policy for {}: {}", topicName, t.getMessage(), t);
+                future.completeExceptionally(t);
+                return;
+            }
 
-                if (retentionPolicies == null) {
-                    retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
-                            () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
-                                    serviceConfig.getDefaultRetentionSizeInMB())
-                    );
-                }
+            if (persistencePolicies == null) {
+                persistencePolicies = policies.map(p -> p.persistence).orElseGet(
+                        () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+                                serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+                                serviceConfig.getManagedLedgerDefaultAckQuorum(),
+                                serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+            }
 
-                ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
-                managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
-                managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
-                managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
-                if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
-                    managedLedgerConfig
-                            .setBookKeeperEnsemblePlacementPolicyClassName(
-                                    ZkIsolatedBookieEnsemblePlacementPolicy.class);
-                    Map<String, Object> properties = Maps.newHashMap();
-                    properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
-                            localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
-                    properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
-                            localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
-                    managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
-                }
-                managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
-                managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
-                managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
-
-                managedLedgerConfig.setMaxUnackedRangesToPersist(
-                        serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
-                managedLedgerConfig.setMaxUnackedRangesToPersistInZk(
-                        serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
-                managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
-                managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
-                        TimeUnit.MINUTES);
-                managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
-                        TimeUnit.MINUTES);
-                managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
-
-                managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
-                        serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
-                managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
-                managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
-                managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
-                managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
-                        serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
-                managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
-                managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+            if (retentionPolicies == null) {
+                retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
+                        () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+                                serviceConfig.getDefaultRetentionSizeInMB())
+                );
+            }
+
+            ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+            managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+            managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+            managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+            if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
                 managedLedgerConfig
-                        .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
-
-                managedLedgerConfig.setLedgerRolloverTimeout(
-                        serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
-                managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
-                managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
-                managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
-                managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
-
-                OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
-                OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration(
-                        topicLevelOffloadPolicies,
-                        OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
-                        getPulsar().getConfig().getProperties());
-                if (topicLevelOffloadPolicies != null) {
-                    try {
-                        LedgerOffloader topicLevelLedgerOffLoader =
-                                pulsar().createManagedLedgerOffloader(offloadPolicies);
-                        managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
-                    } catch (PulsarServerException e) {
-                        future.completeExceptionally(e);
-                        return null;
-                    }
-                } else {
-                    //If the topic level policy is null, use the namespace level
-                    managedLedgerConfig.setLedgerOffloader(
-                            pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
+                        .setBookKeeperEnsemblePlacementPolicyClassName(ZkIsolatedBookieEnsemblePlacementPolicy.class);
+                Map<String, Object> properties = Maps.newHashMap();
+                properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+                        localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
+                properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+                        localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
+                managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+            }
+            managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+            managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+            managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+            managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+            managedLedgerConfig.setMaxUnackedRangesToPersistInZk(serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
+            managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+            managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+                    TimeUnit.MINUTES);
+            managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+                    TimeUnit.MINUTES);
+            managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+            managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+                    serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+            managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+            managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+            managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+            managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+                    serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+            managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+            managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+            managedLedgerConfig
+                    .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+            managedLedgerConfig.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+            managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
+            managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+            managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+            managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+
+            OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
+            OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration(
+                    topicLevelOffloadPolicies,
+                    OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
+                    getPulsar().getConfig().getProperties());
+            if (topicLevelOffloadPolicies != null) {
+                try {
+                    LedgerOffloader topicLevelLedgerOffLoader = pulsar().createManagedLedgerOffloader(offloadPolicies);
+                    managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+                } catch (PulsarServerException e) {
+                    future.completeExceptionally(e);
+                    return;
                 }
+            } else {
+                //If the topic level policy is null, use the namespace level
+                managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
+            }
 
-                managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
-                        serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
-                managedLedgerConfig.setNewEntriesCheckDelayInMillis(
-                        serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
-
-
-                future.complete(managedLedgerConfig);
-                return null;
-            }).exceptionally(ex -> {
-                log.warn("Got exception when reading persistence policy for {}: {}", topicName, ex.getMessage(), ex);
-                future.completeExceptionally(ex);
-                return null;
-            });
+            managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+            managedLedgerConfig.setNewEntriesCheckDelayInMillis(serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
 
 
+            future.complete(managedLedgerConfig);
         }, (exception) -> future.completeExceptionally(exception)));
 
         return future;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c2fd32a44fd..a5613d439a5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -157,7 +157,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
     @SuppressWarnings("unused")
     private volatile long usageCount = 0;
 
-
     static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
 
     private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
@@ -1156,72 +1155,68 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             log.debug("[{}] Checking replication status", name);
         }
 
-        return brokerService.pulsar().getConfigurationCache().policiesCache()
-                .getAsync(AdminResource.path(POLICIES, name.getNamespace()))
-                .thenCompose(optPolicies -> {
-                    if (!optPolicies.isPresent()) {
-                        return FutureUtil.failedFuture(
-                                new ServerMetadataException("Namespace not found: " + name.getNamespace()));
-                    }
-
-                    Policies policies = optPolicies.get();
+        Policies policies = null;
+        try {
+            policies = brokerService.pulsar().getConfigurationCache().policiesCache()
+                    .get(AdminResource.path(POLICIES, name.getNamespace()))
+                    .orElseThrow(() -> new KeeperException.NoNodeException());
+        } catch (Exception e) {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.completeExceptionally(new ServerMetadataException(e));
+            return future;
+        }
+        //Ignore current broker's config for messageTTL for replication.
+        final int newMessageTTLinSeconds;
+        try {
+            newMessageTTLinSeconds = getMessageTTL();
+        } catch (Exception e) {
+            return FutureUtil.failedFuture(new ServerMetadataException(e));
+        }
 
-                    //Ignore current broker's config for messageTTL for replication.
-                    final int newMessageTTLinSeconds;
-                    try {
-                        newMessageTTLinSeconds = getMessageTTL();
-                    } catch (Exception e) {
-                        return FutureUtil.failedFuture(new ServerMetadataException(e));
-                    }
+        Set<String> configuredClusters;
+        if (policies.replication_clusters != null) {
+            configuredClusters = Sets.newTreeSet(policies.replication_clusters);
+        } else {
+            configuredClusters = Collections.emptySet();
+        }
 
-                    Set<String> configuredClusters;
-                    if (policies.replication_clusters != null) {
-                        configuredClusters = Sets.newTreeSet(policies.replication_clusters);
-                    } else {
-                        configuredClusters = Collections.emptySet();
-                    }
+        String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
 
-                    String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
-
-                    // if local cluster is removed from global namespace cluster-list : then delete topic forcefully
-                    // because pulsar
-                    // doesn't serve global topic without local repl-cluster configured.
-                    if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
-                        log.info(
-                                "Deleting topic [{}] because local cluster is not part of global namespace repl list "
-                                        + "{}",
-                                topic, configuredClusters);
-                        return deleteForcefully();
-                    }
+        // if local cluster is removed from global namespace cluster-list : then delete topic forcefully because pulsar
+        // doesn't serve global topic without local repl-cluster configured.
+        if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
+            log.info("Deleting topic [{}] because local cluster is not part of global namespace repl list {}",
+                    topic, configuredClusters);
+            return deleteForcefully();
+        }
 
-                    List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
-                    // Check for missing replicators
-                    for (String cluster : configuredClusters) {
-                        if (cluster.equals(localCluster)) {
-                            continue;
-                        }
+        // Check for missing replicators
+        for (String cluster : configuredClusters) {
+            if (cluster.equals(localCluster)) {
+                continue;
+            }
 
-                        if (!replicators.containsKey(cluster)) {
-                            futures.add(startReplicator(cluster));
-                        }
-                    }
+            if (!replicators.containsKey(cluster)) {
+                futures.add(startReplicator(cluster));
+            }
+        }
 
-                    // Check for replicators to be stopped
-                    replicators.forEach((cluster, replicator) -> {
-                        // Update message TTL
-                        ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
+        // Check for replicators to be stopped
+        replicators.forEach((cluster, replicator) -> {
+            // Update message TTL
+            ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
 
-                        if (!cluster.equals(localCluster)) {
-                            if (!configuredClusters.contains(cluster)) {
-                                futures.add(removeReplicator(cluster));
-                            }
-                        }
+            if (!cluster.equals(localCluster)) {
+                if (!configuredClusters.contains(cluster)) {
+                    futures.add(removeReplicator(cluster));
+                }
+            }
 
-                    });
+        });
 
-                    return FutureUtil.waitForAll(futures);
-                });
+        return FutureUtil.waitForAll(futures);
     }
 
     @Override
@@ -2480,10 +2475,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         TopicName name = TopicName.get(topic);
         TopicPolicies topicPolicies = getTopicPolicies(name);
         Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
-                .getDataIfPresent(AdminResource.path(POLICIES, name.getNamespace()));
-        if (policies == null) {
-          throw new KeeperException.NoNodeException();
-        }
+                .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
+                .orElseThrow(() -> new KeeperException.NoNodeException());
         if (topicPolicies != null && topicPolicies.isMessageTTLSet()) {
             return topicPolicies.getMessageTTLInSeconds();
         }
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 6ae0e52f961..3c73284e7ed 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -529,10 +529,8 @@ public class ProxyConnection extends PulsarHandler {
     ClientConfigurationData createClientConfiguration()
         throws PulsarClientException.UnsupportedAuthenticationException {
         ClientConfigurationData initialConf = new ClientConfigurationData();
+        initialConf.setServiceUrl(service.getServiceUrl());
         ProxyConfiguration proxyConfig = service.getConfiguration();
-        initialConf.setServiceUrl(
-                proxyConfig.isTlsEnabledWithBroker() ? service.getServiceUrlTls() : service.getServiceUrl());
-
         // Apply all arbitrary configuration. This must be called before setting any fields annotated as
         // @Secret on the ClientConfigurationData object because of the way they are serialized.
         // See https://github.com/apache/pulsar/issues/8509 for more information.
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
index 8c07e4b42d7..5f533e37d35 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
@@ -18,12 +18,8 @@
  */
 package org.apache.pulsar.proxy.server;
 
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.testng.annotations.Test;
 
 public class ProxyConnectionTest {
@@ -39,24 +35,4 @@ public class ProxyConnectionTest {
         assertFalse(ProxyConnection
                 .matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:12345", "1.2.3.4:1234"));
     }
-    @Test
-    public void testCreateClientConfiguration() {
-        ProxyConfiguration proxyConfiguration = new ProxyConfiguration();
-        proxyConfiguration.setTlsEnabledWithBroker(true);
-        String proxyUrlTls = "pulsar+ssl://proxy:6651";
-        String proxyUrl = "pulsar://proxy:6650";
-
-        ProxyService proxyService = mock(ProxyService.class);
-        doReturn(proxyConfiguration).when(proxyService).getConfiguration();
-        doReturn(proxyUrlTls).when(proxyService).getServiceUrlTls();
-        doReturn(proxyUrl).when(proxyService).getServiceUrl();
-
-        ProxyConnection proxyConnection = new ProxyConnection(proxyService, null);
-        ClientConfigurationData clientConfiguration = proxyConnection.createClientConfiguration();
-        assertEquals(clientConfiguration.getServiceUrl(), proxyUrlTls);
-
-        proxyConfiguration.setTlsEnabledWithBroker(false);
-        clientConfiguration = proxyConnection.createClientConfiguration();
-        assertEquals(clientConfiguration.getServiceUrl(), proxyUrl);
-    }
 }
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 906cad72435..a52dda264ba 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -273,7 +273,7 @@ The Apache Software License, Version 2.0
   * CGLIB Nodep
     - cglib-nodep-3.3.0.jar
   * Airlift
-    - aircompressor-0.20.jar
+    - aircompressor-0.16.jar
     - airline-0.8.jar
     - bootstrap-0.199.jar
     - bootstrap-0.195.jar