You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/11/05 09:55:37 UTC
[pulsar] branch branch-2.8 updated: [Branch-2.8]Fix cherry-pick
code style (#12627)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 69a4dbb [Branch-2.8]Fix cherry-pick code style (#12627)
69a4dbb is described below
commit 69a4dbb340338cfac6d6dd7f34ae2a7733bad7d2
Author: Hang Chen <ch...@apache.org>
AuthorDate: Fri Nov 5 17:54:43 2021 +0800
[Branch-2.8]Fix cherry-pick code style (#12627)
### Motivation
Fix cherry-pick code style
* fix code style
* revert PR#12526
* fix test
* fix cherry-pick issue
* fix failed test
---
.../broker/admin/impl/PersistentTopicsBase.java | 3 +--
.../pulsar/broker/service/BrokerService.java | 19 ++++++++++---------
.../pulsar/broker/service/PersistentTopicTest.java | 11 +++++++++++
.../api/AuthenticatedProducerConsumerTest.java | 22 +++++++++++++++-------
.../include/pulsar/c/consumer_configuration.h | 6 ------
pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc | 10 ----------
6 files changed, 37 insertions(+), 34 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 94c0a16..1db53c0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -132,7 +132,6 @@ import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -590,7 +589,7 @@ public class PersistentTopicsBase extends AdminResource {
// delete authentication policies of the partitioned topic
CompletableFuture<Void> deleteAuthFuture = new CompletableFuture<>();
pulsar().getPulsarResources().getNamespaceResources()
- .setAsync(topicName.getNamespace(), p -> {
+ .setAsync(path(POLICIES, topicName.getNamespace()), p -> {
for (int i = 0; i < numPartitions; ++i) {
p.auth_policies.getTopicAuthentication().remove(topicName.getPartition(i).toString());
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0b81625..62124fb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -26,6 +26,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -96,7 +97,6 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -1021,7 +1021,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
NamespaceName namespaceName = TopicName.get(topic).getNamespaceObject();
// Check whether there are auth policies for the topic
- pulsar.getPulsarResources().getNamespaceResources().getAsync(namespaceName.toString()).thenAccept(optPolicies -> {
+ pulsar.getPulsarResources().getNamespaceResources()
+ .getAsync(path(POLICIES, namespaceName.toString())).thenAccept(optPolicies -> {
if (!optPolicies.isPresent() || !optPolicies.get().auth_policies.getTopicAuthentication()
.containsKey(topic)) {
// if there is no auth policy for the topic, just complete and return
@@ -1032,7 +1033,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
return;
}
pulsar.getPulsarResources().getNamespaceResources()
- .setAsync(TopicName.get(topic).getNamespace(), p -> {
+ .setAsync(path(POLICIES, TopicName.get(topic).getNamespace()), p -> {
p.auth_policies.getTopicAuthentication().remove(topic);
return p;
}).thenAccept(v -> {
@@ -1105,7 +1106,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
return replicationClients.computeIfAbsent(cluster, key -> {
try {
- String path = PulsarWebResource.path("clusters", cluster);
+ String path = path("clusters", cluster);
ClusterDataImpl data = this.pulsar.getConfigurationCache().clustersCache().get(path)
.orElseThrow(() -> new KeeperException.NoNodeException(path));
ClientBuilder clientBuilder = PulsarClient.builder()
@@ -1180,7 +1181,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
return clusterAdmins.computeIfAbsent(cluster, key -> {
try {
- String path = PulsarWebResource.path("clusters", cluster);
+ String path = path("clusters", cluster);
ClusterDataImpl data = this.pulsar.getConfigurationCache().clustersCache().get(path)
.orElseThrow(() -> new KeeperException.NoNodeException(path));
@@ -1407,7 +1408,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
try {
policies = pulsar
- .getConfigurationCache().policiesCache().get(AdminResource.path(POLICIES,
+ .getConfigurationCache().policiesCache().get(path(POLICIES,
namespace.toString()));
String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
localPolicies = pulsar().getLocalZkCacheService().policiesCache().get(path);
@@ -2628,7 +2629,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
try {
Optional<Policies> policies = pulsar.getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES, topicName.getNamespace()));
+ .get(path(POLICIES, topicName.getNamespace()));
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
@@ -2662,7 +2663,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) {
try {
Optional<Policies> policies = pulsar.getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES, topicName.getNamespace()));
+ .get(path(POLICIES, topicName.getNamespace()));
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
return policies.get().autoSubscriptionCreationOverride;
@@ -2702,7 +2703,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
Integer maxTopicsPerNamespace;
try {
maxTopicsPerNamespace = pulsar.getConfigurationCache().policiesCache()
- .get(AdminResource.path(POLICIES, topicName.getNamespace()))
+ .get(path(POLICIES, topicName.getNamespace()))
.map(p -> p.max_topics_per_namespace)
.orElse(null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index a174edb..1f9fa79 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -96,6 +96,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -128,6 +129,9 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;
@@ -154,6 +158,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
private BrokerService brokerService;
private ManagedLedgerFactory mlFactoryMock;
private ServerCnx serverCnx;
+ private MetadataStoreExtended store;
private ManagedLedger ledgerMock;
private ManagedCursor cursorMock;
private ConfigurationCacheService configCacheService;
@@ -212,6 +217,12 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
doReturn(zkCache).when(pulsar).getLocalZkCacheService();
doReturn(executor).when(pulsar).getOrderedExecutor();
+ store = new ZKMetadataStore(mockZk);
+ PulsarResources pulsarResources = spy(new PulsarResources(store, store));
+ NamespaceResources nsr = spy(new NamespaceResources(store, 30));
+ doReturn(nsr).when(pulsarResources).getNamespaceResources();
+ doReturn(pulsarResources).when(pulsar).getPulsarResources();
+
brokerService = spy(new BrokerService(pulsar, eventLoopGroup));
doReturn(brokerService).when(pulsar).getBrokerService();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index a12c62f..e521550 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit;
import javax.ws.rs.InternalServerErrorException;
import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
@@ -359,15 +361,17 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
admin.topics().grantPermission(topic, "test-user", EnumSet.of(AuthAction.consume));
Awaitility.await().untilAsserted(() -> {
- assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
+ assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(PulsarWebResource.path("policies",
+ NamespaceName.get("p1/ns1").toString()))
.get().auth_policies.getTopicAuthentication().containsKey(topic));
});
admin.topics().delete(topic);
Awaitility.await().untilAsserted(() -> {
- assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
- .get().auth_policies.getTopicAuthentication().containsKey(topic));
+ assertFalse(pulsar.getPulsarResources().getNamespaceResources().getAsync(PulsarWebResource.path("policies",
+ NamespaceName.get("p1/ns1").toString()))
+ .get().get().auth_policies.getTopicAuthentication().containsKey(topic));
});
// test for partitioned topic
@@ -379,10 +383,12 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
.grantPermission(partitionedTopic, "test-user", EnumSet.of(AuthAction.consume));
Awaitility.await().untilAsserted(() -> {
- assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
+ assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(PulsarWebResource.path("policies",
+ NamespaceName.get("p1/ns1").toString()))
.get().auth_policies.getTopicAuthentication().containsKey(partitionedTopic));
for (int i = 0; i < numPartitions; i++) {
- assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
+ assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(PulsarWebResource.path("policies",
+ NamespaceName.get("p1/ns1").toString()))
.get().auth_policies.getTopicAuthentication()
.containsKey(TopicName.get(partitionedTopic).getPartition(i).toString()));
}
@@ -390,10 +396,12 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
admin.topics().deletePartitionedTopic("persistent://p1/ns1/partitioned-topic");
Awaitility.await().untilAsserted(() -> {
- assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
+ assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(PulsarWebResource.path("policies",
+ NamespaceName.get("p1/ns1").toString()))
.get().auth_policies.getTopicAuthentication().containsKey(partitionedTopic));
for (int i = 0; i < numPartitions; i++) {
- assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
+ assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(PulsarWebResource.path("policies",
+ NamespaceName.get("p1/ns1").toString()))
.get().auth_policies.getTopicAuthentication()
.containsKey(TopicName.get(partitionedTopic).getPartition(i).toString()));
}
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index a11e11e..efe353a 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -275,12 +275,6 @@ PULSAR_PUBLIC void pulsar_consumer_set_subscription_initial_position(
PULSAR_PUBLIC void pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t *conf,
const char *name, const char *value);
-PULSAR_PUBLIC void pulsar_consumer_configuration_set_priority_level(
- pulsar_consumer_configuration_t *consumer_configuration, int priority_level);
-
-PULSAR_PUBLIC int pulsar_consumer_configuration_get_priority_level(
- pulsar_consumer_configuration_t *consumer_configuration);
-
// const CryptoKeyReaderPtr getCryptoKeyReader()
//
// const;
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index aaec12c..90c60df 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -185,13 +185,3 @@ int pulsar_consumer_get_subscription_initial_position(
pulsar_consumer_configuration_t *consumer_configuration) {
return consumer_configuration->consumerConfiguration.getSubscriptionInitialPosition();
}
-
-void pulsar_consumer_configuration_set_priority_level(pulsar_consumer_configuration_t *consumer_configuration,
- int priority_level) {
- consumer_configuration->consumerConfiguration.setPriorityLevel(priority_level);
-}
-
-int pulsar_consumer_configuration_get_priority_level(
- pulsar_consumer_configuration_t *consumer_configuration) {
- return consumer_configuration->consumerConfiguration.getPriorityLevel();
-}