You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/01 02:24:21 UTC

[pulsar] branch branch-2.7 updated: cherry pick 14269 to branch 2.7 (#16848)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new c9e3a23d50c cherry pick 14269 to branch 2.7 (#16848)
c9e3a23d50c is described below

commit c9e3a23d50c5b7ab48841d4886d05c30756830e7
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Mon Aug 1 10:24:15 2022 +0800

    cherry pick 14269 to branch 2.7 (#16848)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  2 +-
 .../pulsar/broker/web/PulsarWebResource.java       |  4 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 79 +++++++++++++++-------
 3 files changed, 58 insertions(+), 27 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index f0c028a8146..724c1c759cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -340,7 +340,7 @@ public abstract class AdminResource extends PulsarWebResource {
         } catch (IllegalArgumentException e) {
             throw new RestException(Status.PRECONDITION_FAILED, "Tenant name or namespace is not valid");
         } catch (RestException re) {
-            throw new RestException(Status.PRECONDITION_FAILED, "Namespace does not have any clusters configured");
+            throw re;
         } catch (Exception e) {
             log.warn("Failed to validate global cluster configuration : ns={}  emsg={}", namespace, e.getMessage());
             throw new RestException(Status.SERVICE_UNAVAILABLE, "Failed to validate global cluster configuration");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 1130810d0ae..8b39ceefb28 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -786,9 +786,9 @@ public abstract class PulsarWebResource {
                     validationFuture.complete(null);
                 }
             } else {
-                String msg = String.format("Policies not found for %s namespace", namespace.toString());
+                String msg = String.format("Namespace %s not found", namespace.toString());
                 log.warn(msg);
-                validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND, msg));
+                validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Namespace not found"));
             }
         }).exceptionally(ex -> {
             String msg = String.format("Failed to validate global cluster configuration : cluster=%s ns=%s  emsg=%s",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 4dec97a1ea8..288373111d8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -18,15 +18,38 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
 import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
 import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
 import org.apache.pulsar.broker.admin.v2.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
@@ -34,7 +57,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -45,11 +67,14 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
 import org.apache.zookeeper.KeeperException;
 import org.mockito.ArgumentCaptor;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -57,29 +82,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriInfo;
-import java.lang.reflect.Field;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-
 @PrepareForTest(PersistentTopics.class)
 @PowerMockIgnore("com.sun.management.*")
 public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
@@ -373,6 +375,35 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         Assert.assertEquals(metadata.partitions, 0);
     }
 
+    @Test
+    public void testCreateTopicWithReplicationCluster() {
+        final String topicName = "test-topic-ownership";
+        CompletableFuture<Optional<Policies>> policyFuture = new CompletableFuture<>();
+        Policies policies = new Policies();
+        policyFuture.complete(Optional.of(policies));
+        ConfigurationCacheService configurationCacheService = spy(pulsar.getConfigurationCache());
+        when(pulsar.getConfigurationCache()).thenReturn(configurationCacheService);
+        ZooKeeperDataCache<Policies> policiesZooKeeperDataCache = spy(pulsar.getConfigurationCache().policiesCache());
+        when(pulsar.getConfigurationCache().policiesCache()).thenReturn(policiesZooKeeperDataCache);
+        doReturn(policyFuture).when(policiesZooKeeperDataCache).getAsync(any(String.class));
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2, true);
+        verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
+        Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.PRECONDITION_FAILED.getStatusCode());
+        Assert.assertTrue(errCaptor.getValue().getMessage().contains("Namespace does not have any clusters configured"));
+        // Test policy not exist and return 'Namespace not found'
+        CompletableFuture<Optional<Policies>> policyFuture2 = new CompletableFuture<>();
+        policyFuture2.complete(Optional.empty());
+        doReturn(policyFuture2).when(policiesZooKeeperDataCache).getAsync(any(String.class));
+        response = mock(AsyncResponse.class);
+        errCaptor = ArgumentCaptor.forClass(RestException.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2, true);
+        verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
+        Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
+        Assert.assertTrue(errCaptor.getValue().getMessage().contains("Namespace not found"));
+    }
+
     @Test(expectedExceptions = RestException.class)
     public void testCreateNonPartitionedTopicWithInvalidName() {
         final String topicName = "standard-topic-partition-10";