You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/07/31 06:31:43 UTC
[pulsar] branch branch-2.7 updated: [Branch 2.7] Revert some PRs to fix CI for branch 2.7 (#16882)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new d966c1af89f [Branch 2.7] Revert some PRs to fix CI for branch 2.7 (#16882)
d966c1af89f is described below
commit d966c1af89f7447470c0cf853f28f466dfdaf9da
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Sun Jul 31 14:31:36 2022 +0800
[Branch 2.7] Revert some PRs to fix CI for branch 2.7 (#16882)
* Revert "[fix][proxy] Fix client service url (#16834)"
This reverts commit 10b4e99a815d1c36660c315171f71d1f866a4f74.
* Revert "[Build] Use grpc-bom to align grpc library versions (#15234)"
This reverts commit 99c93d2f5fe2f72e185d46813383c7f7c984022d.
* Revert "upgrade aircompressor to 0.20 (#11790)"
This reverts commit 5ad16b623fdf3a2157577694ff4bc7fd06450a9b.
* Revert "[Branch-2.7] Fixed deadlock on metadata cache missing while doing checkReplication (#12484)"
This reverts commit 32fe228854464504d18de240f719b583cf262042.
* Revert changes of PersistentTopic#getMessageTTL in #12339.
Co-authored-by: JiangHaiting <ja...@apache.org>
---
distribution/server/src/assemble/LICENSE.bin.txt | 2 +-
pom.xml | 32 ++--
.../pulsar/broker/service/BrokerService.java | 210 ++++++++++-----------
.../broker/service/persistent/PersistentTopic.java | 115 ++++++-----
.../pulsar/proxy/server/ProxyConnection.java | 4 +-
.../pulsar/proxy/server/ProxyConnectionTest.java | 24 ---
pulsar-sql/presto-distribution/LICENSE | 2 +-
7 files changed, 173 insertions(+), 216 deletions(-)
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 936528ad8b7..41052de646e 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -425,7 +425,7 @@ The Apache Software License, Version 2.0
- org.apache.httpcomponents-httpclient-4.5.5.jar
- org.apache.httpcomponents-httpcore-4.4.9.jar
* AirCompressor
- - io.airlift-aircompressor-0.20.jar
+ - io.airlift-aircompressor-0.16.jar
* AsyncHttpClient
- org.asynchttpclient-async-http-client-2.12.1.jar
- org.asynchttpclient-async-http-client-netty-utils-2.12.1.jar
diff --git a/pom.xml b/pom.xml
index 40d045ec0dc..718ef2d81df 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,7 +157,7 @@ flexible messaging model and an intuitive client API.</description>
<prometheus-jmx.version>0.14.0</prometheus-jmx.version>
<confluent.version>5.3.2</confluent.version>
<kafka-avro-convert-jackson.version>1.9.13</kafka-avro-convert-jackson.version>
- <aircompressor.version>0.20</aircompressor.version>
+ <aircompressor.version>0.16</aircompressor.version>
<asynchttpclient.version>2.12.1</asynchttpclient.version>
<jcommander.version>1.78</jcommander.version>
<commons-lang3.version>3.6</commons-lang3.version>
@@ -598,7 +598,7 @@ flexible messaging model and an intuitive client API.</description>
<artifactId>jna</artifactId>
<version>${jna.version}</version>
</dependency>
-
+
<dependency>
<groupId>com.github.docker-java</groupId>
<artifactId>docker-java-core</artifactId>
@@ -787,14 +787,6 @@ flexible messaging model and an intuitive client API.</description>
<version>${typetools.version}</version>
</dependency>
- <dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-bom</artifactId>
- <version>${grpc.version}</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
-
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
@@ -806,7 +798,25 @@ flexible messaging model and an intuitive client API.</description>
</exclusion>
</exclusions>
</dependency>
-
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-core</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf-lite</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-bom</artifactId>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 40fa540b9c0..29d1001e4bd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1190,139 +1190,119 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
Optional<Policies> policies = Optional.empty();
Optional<LocalPolicies> localPolicies = Optional.empty();
- PersistencePolicies tmpPersistencePolicies = null;
- RetentionPolicies tmpRetentionPolicies = null;
- OffloadPolicies tmpTopicLevelOffloadPolicies = null;
+ PersistencePolicies persistencePolicies = null;
+ RetentionPolicies retentionPolicies = null;
+ OffloadPolicies topicLevelOffloadPolicies = null;
if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
try {
TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName);
if (topicPolicies != null) {
- tmpPersistencePolicies = topicPolicies.getPersistence();
- tmpRetentionPolicies = topicPolicies.getRetentionPolicies();
- tmpTopicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
+ persistencePolicies = topicPolicies.getPersistence();
+ retentionPolicies = topicPolicies.getRetentionPolicies();
+ topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies();
}
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies have not been initialized yet.", topicName);
}
}
- final PersistencePolicies finalPersistencePolicies = tmpPersistencePolicies;
- final RetentionPolicies finalRetentionPolicies = tmpRetentionPolicies;
- final OffloadPolicies finalTopicLevelOffloadPolicies = tmpTopicLevelOffloadPolicies;
-
-
- CompletableFuture<Optional<Policies>> policiesFuture = pulsar
- .getConfigurationCache().policiesCache().getAsync(AdminResource.path(POLICIES,
- namespace.toString()));
- String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
- CompletableFuture<Optional<LocalPolicies>> localPoliciesFuture =
- pulsar().getLocalZkCacheService().policiesCache().getAsync(path);
-
- policiesFuture.thenCombine(localPoliciesFuture, (optPolicies, optLocalPolicies) -> {
- PersistencePolicies persistencePolicies = finalPersistencePolicies;
- RetentionPolicies retentionPolicies = finalRetentionPolicies;
- OffloadPolicies topicLevelOffloadPolicies = finalTopicLevelOffloadPolicies;
-
- if (persistencePolicies == null) {
- persistencePolicies = policies.map(p -> p.persistence).orElseGet(
- () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
- serviceConfig.getManagedLedgerDefaultWriteQuorum(),
- serviceConfig.getManagedLedgerDefaultAckQuorum(),
- serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
- }
+ try {
+ policies = pulsar
+ .getConfigurationCache().policiesCache().get(AdminResource.path(POLICIES,
+ namespace.toString()));
+ String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
+ localPolicies = pulsar().getLocalZkCacheService().policiesCache().get(path);
+ } catch (Throwable t) {
+ // Ignoring since if we don't have policies, we fallback on the default
+ log.warn("Got exception when reading persistence policy for {}: {}", topicName, t.getMessage(), t);
+ future.completeExceptionally(t);
+ return;
+ }
- if (retentionPolicies == null) {
- retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
- () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
- serviceConfig.getDefaultRetentionSizeInMB())
- );
- }
+ if (persistencePolicies == null) {
+ persistencePolicies = policies.map(p -> p.persistence).orElseGet(
+ () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
+ serviceConfig.getManagedLedgerDefaultWriteQuorum(),
+ serviceConfig.getManagedLedgerDefaultAckQuorum(),
+ serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
+ }
- ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
- managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
- managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
- managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
- if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
- managedLedgerConfig
- .setBookKeeperEnsemblePlacementPolicyClassName(
- ZkIsolatedBookieEnsemblePlacementPolicy.class);
- Map<String, Object> properties = Maps.newHashMap();
- properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
- localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
- properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
- localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
- managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
- }
- managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
- managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
- managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
-
- managedLedgerConfig.setMaxUnackedRangesToPersist(
- serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
- managedLedgerConfig.setMaxUnackedRangesToPersistInZk(
- serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
- managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
- managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
- managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
- TimeUnit.MINUTES);
- managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
-
- managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
- serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
- managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
- managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
- managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
- managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
- serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
- managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
- managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+ if (retentionPolicies == null) {
+ retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
+ () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
+ serviceConfig.getDefaultRetentionSizeInMB())
+ );
+ }
+
+ ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+ managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
+ managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
+ managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
+ if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
managedLedgerConfig
- .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
-
- managedLedgerConfig.setLedgerRolloverTimeout(
- serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
- managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
- managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
- managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
- managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
-
- OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
- OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration(
- topicLevelOffloadPolicies,
- OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
- getPulsar().getConfig().getProperties());
- if (topicLevelOffloadPolicies != null) {
- try {
- LedgerOffloader topicLevelLedgerOffLoader =
- pulsar().createManagedLedgerOffloader(offloadPolicies);
- managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
- } catch (PulsarServerException e) {
- future.completeExceptionally(e);
- return null;
- }
- } else {
- //If the topic level policy is null, use the namespace level
- managedLedgerConfig.setLedgerOffloader(
- pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
+ .setBookKeeperEnsemblePlacementPolicyClassName(ZkIsolatedBookieEnsemblePlacementPolicy.class);
+ Map<String, Object> properties = Maps.newHashMap();
+ properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
+ localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
+ properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
+ localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
+ managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
+ }
+ managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
+ managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType());
+ managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword());
+
+ managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist());
+ managedLedgerConfig.setMaxUnackedRangesToPersistInZk(serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper());
+ managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger());
+ managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+ managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(),
+ TimeUnit.MINUTES);
+ managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes());
+
+ managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
+ serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+ managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
+ managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds());
+ managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
+ managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled(
+ serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled());
+ managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
+ managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
+ managedLedgerConfig
+ .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger());
+
+ managedLedgerConfig.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds());
+ managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
+ managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
+ managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
+ managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
+
+ OffloadPolicies nsLevelOffloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
+ OffloadPolicies offloadPolicies = OffloadPolicies.mergeConfiguration(
+ topicLevelOffloadPolicies,
+ OffloadPolicies.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)),
+ getPulsar().getConfig().getProperties());
+ if (topicLevelOffloadPolicies != null) {
+ try {
+ LedgerOffloader topicLevelLedgerOffLoader = pulsar().createManagedLedgerOffloader(offloadPolicies);
+ managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader);
+ } catch (PulsarServerException e) {
+ future.completeExceptionally(e);
+ return;
}
+ } else {
+ //If the topic level policy is null, use the namespace level
+ managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
+ }
- managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
- serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
- managedLedgerConfig.setNewEntriesCheckDelayInMillis(
- serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
-
-
- future.complete(managedLedgerConfig);
- return null;
- }).exceptionally(ex -> {
- log.warn("Got exception when reading persistence policy for {}: {}", topicName, ex.getMessage(), ex);
- future.completeExceptionally(ex);
- return null;
- });
+ managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
+ managedLedgerConfig.setNewEntriesCheckDelayInMillis(serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());
+ future.complete(managedLedgerConfig);
}, (exception) -> future.completeExceptionally(exception)));
return future;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c2fd32a44fd..a5613d439a5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -157,7 +157,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
@SuppressWarnings("unused")
private volatile long usageCount = 0;
-
static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
@@ -1156,72 +1155,68 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
log.debug("[{}] Checking replication status", name);
}
- return brokerService.pulsar().getConfigurationCache().policiesCache()
- .getAsync(AdminResource.path(POLICIES, name.getNamespace()))
- .thenCompose(optPolicies -> {
- if (!optPolicies.isPresent()) {
- return FutureUtil.failedFuture(
- new ServerMetadataException("Namespace not found: " + name.getNamespace()));
- }
-
- Policies policies = optPolicies.get();
+ Policies policies = null;
+ try {
+ policies = brokerService.pulsar().getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, name.getNamespace()))
+ .orElseThrow(() -> new KeeperException.NoNodeException());
+ } catch (Exception e) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.completeExceptionally(new ServerMetadataException(e));
+ return future;
+ }
+ //Ignore current broker's config for messageTTL for replication.
+ final int newMessageTTLinSeconds;
+ try {
+ newMessageTTLinSeconds = getMessageTTL();
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(new ServerMetadataException(e));
+ }
- //Ignore current broker's config for messageTTL for replication.
- final int newMessageTTLinSeconds;
- try {
- newMessageTTLinSeconds = getMessageTTL();
- } catch (Exception e) {
- return FutureUtil.failedFuture(new ServerMetadataException(e));
- }
+ Set<String> configuredClusters;
+ if (policies.replication_clusters != null) {
+ configuredClusters = Sets.newTreeSet(policies.replication_clusters);
+ } else {
+ configuredClusters = Collections.emptySet();
+ }
- Set<String> configuredClusters;
- if (policies.replication_clusters != null) {
- configuredClusters = Sets.newTreeSet(policies.replication_clusters);
- } else {
- configuredClusters = Collections.emptySet();
- }
+ String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
- String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
-
- // if local cluster is removed from global namespace cluster-list : then delete topic forcefully
- // because pulsar
- // doesn't serve global topic without local repl-cluster configured.
- if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
- log.info(
- "Deleting topic [{}] because local cluster is not part of global namespace repl list "
- + "{}",
- topic, configuredClusters);
- return deleteForcefully();
- }
+ // if local cluster is removed from global namespace cluster-list : then delete topic forcefully because pulsar
+ // doesn't serve global topic without local repl-cluster configured.
+ if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
+ log.info("Deleting topic [{}] because local cluster is not part of global namespace repl list {}",
+ topic, configuredClusters);
+ return deleteForcefully();
+ }
- List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ List<CompletableFuture<Void>> futures = Lists.newArrayList();
- // Check for missing replicators
- for (String cluster : configuredClusters) {
- if (cluster.equals(localCluster)) {
- continue;
- }
+ // Check for missing replicators
+ for (String cluster : configuredClusters) {
+ if (cluster.equals(localCluster)) {
+ continue;
+ }
- if (!replicators.containsKey(cluster)) {
- futures.add(startReplicator(cluster));
- }
- }
+ if (!replicators.containsKey(cluster)) {
+ futures.add(startReplicator(cluster));
+ }
+ }
- // Check for replicators to be stopped
- replicators.forEach((cluster, replicator) -> {
- // Update message TTL
- ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
+ // Check for replicators to be stopped
+ replicators.forEach((cluster, replicator) -> {
+ // Update message TTL
+ ((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
- if (!cluster.equals(localCluster)) {
- if (!configuredClusters.contains(cluster)) {
- futures.add(removeReplicator(cluster));
- }
- }
+ if (!cluster.equals(localCluster)) {
+ if (!configuredClusters.contains(cluster)) {
+ futures.add(removeReplicator(cluster));
+ }
+ }
- });
+ });
- return FutureUtil.waitForAll(futures);
- });
+ return FutureUtil.waitForAll(futures);
}
@Override
@@ -2480,10 +2475,8 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
TopicName name = TopicName.get(topic);
TopicPolicies topicPolicies = getTopicPolicies(name);
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
- .getDataIfPresent(AdminResource.path(POLICIES, name.getNamespace()));
- if (policies == null) {
- throw new KeeperException.NoNodeException();
- }
+ .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
+ .orElseThrow(() -> new KeeperException.NoNodeException());
if (topicPolicies != null && topicPolicies.isMessageTTLSet()) {
return topicPolicies.getMessageTTLInSeconds();
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 6ae0e52f961..3c73284e7ed 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -529,10 +529,8 @@ public class ProxyConnection extends PulsarHandler {
ClientConfigurationData createClientConfiguration()
throws PulsarClientException.UnsupportedAuthenticationException {
ClientConfigurationData initialConf = new ClientConfigurationData();
+ initialConf.setServiceUrl(service.getServiceUrl());
ProxyConfiguration proxyConfig = service.getConfiguration();
- initialConf.setServiceUrl(
- proxyConfig.isTlsEnabledWithBroker() ? service.getServiceUrlTls() : service.getServiceUrl());
-
// Apply all arbitrary configuration. This must be called before setting any fields annotated as
// @Secret on the ClientConfigurationData object because of the way they are serialized.
// See https://github.com/apache/pulsar/issues/8509 for more information.
diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
index 8c07e4b42d7..5f533e37d35 100644
--- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
+++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyConnectionTest.java
@@ -18,12 +18,8 @@
*/
package org.apache.pulsar.proxy.server;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.testng.annotations.Test;
public class ProxyConnectionTest {
@@ -39,24 +35,4 @@ public class ProxyConnectionTest {
assertFalse(ProxyConnection
.matchesHostAndPort("pulsar://", "pulsar://1.2.3.4:12345", "1.2.3.4:1234"));
}
- @Test
- public void testCreateClientConfiguration() {
- ProxyConfiguration proxyConfiguration = new ProxyConfiguration();
- proxyConfiguration.setTlsEnabledWithBroker(true);
- String proxyUrlTls = "pulsar+ssl://proxy:6651";
- String proxyUrl = "pulsar://proxy:6650";
-
- ProxyService proxyService = mock(ProxyService.class);
- doReturn(proxyConfiguration).when(proxyService).getConfiguration();
- doReturn(proxyUrlTls).when(proxyService).getServiceUrlTls();
- doReturn(proxyUrl).when(proxyService).getServiceUrl();
-
- ProxyConnection proxyConnection = new ProxyConnection(proxyService, null);
- ClientConfigurationData clientConfiguration = proxyConnection.createClientConfiguration();
- assertEquals(clientConfiguration.getServiceUrl(), proxyUrlTls);
-
- proxyConfiguration.setTlsEnabledWithBroker(false);
- clientConfiguration = proxyConnection.createClientConfiguration();
- assertEquals(clientConfiguration.getServiceUrl(), proxyUrl);
- }
}
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 906cad72435..a52dda264ba 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -273,7 +273,7 @@ The Apache Software License, Version 2.0
* CGLIB Nodep
- cglib-nodep-3.3.0.jar
* Airlift
- - aircompressor-0.20.jar
+ - aircompressor-0.16.jar
- airline-0.8.jar
- bootstrap-0.199.jar
- bootstrap-0.195.jar