You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/11/05 09:55:37 UTC

[pulsar] branch branch-2.8 updated: [Branch-2.8]Fix cherry-pick code style (#12627)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 69a4dbb  [Branch-2.8]Fix cherry-pick code style (#12627)
69a4dbb is described below

commit 69a4dbb340338cfac6d6dd7f34ae2a7733bad7d2
Author: Hang Chen <ch...@apache.org>
AuthorDate: Fri Nov 5 17:54:43 2021 +0800

    [Branch-2.8]Fix cherry-pick code style (#12627)
    
    ### Motivation
    Fix cherry-pick code style
    
    * fix code style
    
    * revert PR#12526
    
    * fix test
    
    * fix cherry-pick issue
    
    * fix failed test
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  3 +--
 .../pulsar/broker/service/BrokerService.java       | 19 ++++++++++---------
 .../pulsar/broker/service/PersistentTopicTest.java | 11 +++++++++++
 .../api/AuthenticatedProducerConsumerTest.java     | 22 +++++++++++++++-------
 .../include/pulsar/c/consumer_configuration.h      |  6 ------
 pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc | 10 ----------
 6 files changed, 37 insertions(+), 34 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 94c0a16..1db53c0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -132,7 +132,6 @@ import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
-import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -590,7 +589,7 @@ public class PersistentTopicsBase extends AdminResource {
                 // delete authentication policies of the partitioned topic
                 CompletableFuture<Void> deleteAuthFuture = new CompletableFuture<>();
                 pulsar().getPulsarResources().getNamespaceResources()
-                    .setAsync(topicName.getNamespace(), p -> {
+                    .setAsync(path(POLICIES, topicName.getNamespace()), p -> {
                         for (int i = 0; i < numPartitions; ++i) {
                             p.auth_policies.getTopicAuthentication().remove(topicName.getPartition(i).toString());
                         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 0b81625..62124fb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -26,6 +26,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
 import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
+import static org.apache.pulsar.broker.web.PulsarWebResource.path;
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -96,7 +97,6 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
@@ -1021,7 +1021,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         }
         NamespaceName namespaceName = TopicName.get(topic).getNamespaceObject();
         // Check whether there are auth policies for the topic
-        pulsar.getPulsarResources().getNamespaceResources().getAsync(namespaceName.toString()).thenAccept(optPolicies -> {
+        pulsar.getPulsarResources().getNamespaceResources()
+            .getAsync(path(POLICIES, namespaceName.toString())).thenAccept(optPolicies -> {
             if (!optPolicies.isPresent() || !optPolicies.get().auth_policies.getTopicAuthentication()
                     .containsKey(topic)) {
                 // if there is no auth policy for the topic, just complete and return
@@ -1032,7 +1033,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 return;
             }
             pulsar.getPulsarResources().getNamespaceResources()
-                    .setAsync(TopicName.get(topic).getNamespace(), p -> {
+                    .setAsync(path(POLICIES, TopicName.get(topic).getNamespace()), p -> {
                         p.auth_policies.getTopicAuthentication().remove(topic);
                         return p;
                     }).thenAccept(v -> {
@@ -1105,7 +1106,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
 
         return replicationClients.computeIfAbsent(cluster, key -> {
             try {
-                String path = PulsarWebResource.path("clusters", cluster);
+                String path = path("clusters", cluster);
                 ClusterDataImpl data = this.pulsar.getConfigurationCache().clustersCache().get(path)
                         .orElseThrow(() -> new KeeperException.NoNodeException(path));
                 ClientBuilder clientBuilder = PulsarClient.builder()
@@ -1180,7 +1181,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         }
         return clusterAdmins.computeIfAbsent(cluster, key -> {
             try {
-                String path = PulsarWebResource.path("clusters", cluster);
+                String path = path("clusters", cluster);
                 ClusterDataImpl data = this.pulsar.getConfigurationCache().clustersCache().get(path)
                         .orElseThrow(() -> new KeeperException.NoNodeException(path));
 
@@ -1407,7 +1408,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
 
             try {
                 policies = pulsar
-                        .getConfigurationCache().policiesCache().get(AdminResource.path(POLICIES,
+                        .getConfigurationCache().policiesCache().get(path(POLICIES,
                                 namespace.toString()));
                 String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString());
                 localPolicies = pulsar().getLocalZkCacheService().policiesCache().get(path);
@@ -2628,7 +2629,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
         try {
             Optional<Policies> policies = pulsar.getConfigurationCache().policiesCache()
-                .get(AdminResource.path(POLICIES, topicName.getNamespace()));
+                .get(path(POLICIES, topicName.getNamespace()));
 
             // If namespace policies have the field set, it will override the broker-level setting
             if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
@@ -2662,7 +2663,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) {
         try {
             Optional<Policies> policies = pulsar.getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, topicName.getNamespace()));
+                    .get(path(POLICIES, topicName.getNamespace()));
             // If namespace policies have the field set, it will override the broker-level setting
             if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) {
                 return policies.get().autoSubscriptionCreationOverride;
@@ -2702,7 +2703,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         Integer maxTopicsPerNamespace;
         try {
             maxTopicsPerNamespace = pulsar.getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, topicName.getNamespace()))
+                    .get(path(POLICIES, topicName.getNamespace()))
                     .map(p -> p.max_topics_per_namespace)
                     .orElse(null);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index a174edb..1f9fa79 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -96,6 +96,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
@@ -128,6 +129,9 @@ import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.compaction.CompactedTopic;
 import org.apache.pulsar.compaction.CompactedTopicContext;
 import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.ZooKeeper;
@@ -154,6 +158,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
     private BrokerService brokerService;
     private ManagedLedgerFactory mlFactoryMock;
     private ServerCnx serverCnx;
+    private MetadataStoreExtended store;
     private ManagedLedger ledgerMock;
     private ManagedCursor cursorMock;
     private ConfigurationCacheService configCacheService;
@@ -212,6 +217,12 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         doReturn(zkCache).when(pulsar).getLocalZkCacheService();
         doReturn(executor).when(pulsar).getOrderedExecutor();
 
+        store = new ZKMetadataStore(mockZk);
+        PulsarResources pulsarResources = spy(new PulsarResources(store, store));
+        NamespaceResources nsr = spy(new NamespaceResources(store, 30));
+        doReturn(nsr).when(pulsarResources).getNamespaceResources();
+        doReturn(pulsarResources).when(pulsar).getPulsarResources();
+
         brokerService = spy(new BrokerService(pulsar, eventLoopGroup));
         doReturn(brokerService).when(pulsar).getBrokerService();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
index a12c62f..e521550 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java
@@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit;
 import javax.ws.rs.InternalServerErrorException;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderBasic;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.impl.auth.AuthenticationBasic;
@@ -359,15 +361,17 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
         admin.topics().grantPermission(topic, "test-user", EnumSet.of(AuthAction.consume));
 
         Awaitility.await().untilAsserted(() -> {
-            assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
+            assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(PulsarWebResource.path("policies",
+                NamespaceName.get("p1/ns1").toString()))
                 .get().auth_policies.getTopicAuthentication().containsKey(topic));
         });
 
         admin.topics().delete(topic);
 
         Awaitility.await().untilAsserted(() -> {
-            assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
-                    .get().auth_policies.getTopicAuthentication().containsKey(topic));
+            assertFalse(pulsar.getPulsarResources().getNamespaceResources().getAsync(PulsarWebResource.path("policies",
+                NamespaceName.get("p1/ns1").toString()))
+                    .get().get().auth_policies.getTopicAuthentication().containsKey(topic));
         });
 
         // test for partitioned topic
@@ -379,10 +383,12 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
                 .grantPermission(partitionedTopic, "test-user", EnumSet.of(AuthAction.consume));
 
         Awaitility.await().untilAsserted(() -> {
-            assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
+            assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(PulsarWebResource.path("policies",
+                NamespaceName.get("p1/ns1").toString()))
                     .get().auth_policies.getTopicAuthentication().containsKey(partitionedTopic));
             for (int i = 0; i < numPartitions; i++) {
-                assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
+                assertTrue(pulsar.getPulsarResources().getNamespaceResources().get(PulsarWebResource.path("policies",
+                    NamespaceName.get("p1/ns1").toString()))
                         .get().auth_policies.getTopicAuthentication()
                         .containsKey(TopicName.get(partitionedTopic).getPartition(i).toString()));
             }
@@ -390,10 +396,12 @@ public class AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
 
         admin.topics().deletePartitionedTopic("persistent://p1/ns1/partitioned-topic");
         Awaitility.await().untilAsserted(() -> {
-            assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
+            assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(PulsarWebResource.path("policies",
+                NamespaceName.get("p1/ns1").toString()))
                     .get().auth_policies.getTopicAuthentication().containsKey(partitionedTopic));
             for (int i = 0; i < numPartitions; i++) {
-                assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(NamespaceName.get("p1/ns1").toString())
+                assertFalse(pulsar.getPulsarResources().getNamespaceResources().get(PulsarWebResource.path("policies",
+                    NamespaceName.get("p1/ns1").toString()))
                         .get().auth_policies.getTopicAuthentication()
                         .containsKey(TopicName.get(partitionedTopic).getPartition(i).toString()));
             }
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index a11e11e..efe353a 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -275,12 +275,6 @@ PULSAR_PUBLIC void pulsar_consumer_set_subscription_initial_position(
 PULSAR_PUBLIC void pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t *conf,
                                                               const char *name, const char *value);
 
-PULSAR_PUBLIC void pulsar_consumer_configuration_set_priority_level(
-    pulsar_consumer_configuration_t *consumer_configuration, int priority_level);
-
-PULSAR_PUBLIC int pulsar_consumer_configuration_get_priority_level(
-    pulsar_consumer_configuration_t *consumer_configuration);
-
 // const CryptoKeyReaderPtr getCryptoKeyReader()
 //
 // const;
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index aaec12c..90c60df 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -185,13 +185,3 @@ int pulsar_consumer_get_subscription_initial_position(
     pulsar_consumer_configuration_t *consumer_configuration) {
     return consumer_configuration->consumerConfiguration.getSubscriptionInitialPosition();
 }
-
-void pulsar_consumer_configuration_set_priority_level(pulsar_consumer_configuration_t *consumer_configuration,
-                                                      int priority_level) {
-    consumer_configuration->consumerConfiguration.setPriorityLevel(priority_level);
-}
-
-int pulsar_consumer_configuration_get_priority_level(
-    pulsar_consumer_configuration_t *consumer_configuration) {
-    return consumer_configuration->consumerConfiguration.getPriorityLevel();
-}