You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/01 02:24:21 UTC
[pulsar] branch branch-2.7 updated: cherry pick 14269 to branch 2.7 (#16848)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new c9e3a23d50c cherry pick 14269 to branch 2.7 (#16848)
c9e3a23d50c is described below
commit c9e3a23d50c5b7ab48841d4886d05c30756830e7
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Mon Aug 1 10:24:15 2022 +0800
cherry pick 14269 to branch 2.7 (#16848)
---
.../apache/pulsar/broker/admin/AdminResource.java | 2 +-
.../pulsar/broker/web/PulsarWebResource.java | 4 +-
.../pulsar/broker/admin/PersistentTopicsTest.java | 79 +++++++++++++++-------
3 files changed, 58 insertions(+), 27 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index f0c028a8146..724c1c759cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -340,7 +340,7 @@ public abstract class AdminResource extends PulsarWebResource {
} catch (IllegalArgumentException e) {
throw new RestException(Status.PRECONDITION_FAILED, "Tenant name or namespace is not valid");
} catch (RestException re) {
- throw new RestException(Status.PRECONDITION_FAILED, "Namespace does not have any clusters configured");
+ throw re;
} catch (Exception e) {
log.warn("Failed to validate global cluster configuration : ns={} emsg={}", namespace, e.getMessage());
throw new RestException(Status.SERVICE_UNAVAILABLE, "Failed to validate global cluster configuration");
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 1130810d0ae..8b39ceefb28 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -786,9 +786,9 @@ public abstract class PulsarWebResource {
validationFuture.complete(null);
}
} else {
- String msg = String.format("Policies not found for %s namespace", namespace.toString());
+ String msg = String.format("Namespace %s not found", namespace.toString());
log.warn(msg);
- validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND, msg));
+ validationFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Namespace not found"));
}
}).exceptionally(ex -> {
String msg = String.format("Failed to validate global cluster configuration : cluster=%s ns=%s emsg=%s",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 4dec97a1ea8..288373111d8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -18,15 +18,38 @@
*/
package org.apache.pulsar.broker.admin;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriInfo;
import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
@@ -34,7 +57,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -45,11 +67,14 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.apache.zookeeper.KeeperException;
import org.mockito.ArgumentCaptor;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -57,29 +82,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import javax.ws.rs.container.AsyncResponse;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriInfo;
-import java.lang.reflect.Field;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.verify;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-
@PrepareForTest(PersistentTopics.class)
@PowerMockIgnore("com.sun.management.*")
public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
@@ -373,6 +375,35 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(metadata.partitions, 0);
}
+ @Test
+ public void testCreateTopicWithReplicationCluster() {
+ final String topicName = "test-topic-ownership";
+ CompletableFuture<Optional<Policies>> policyFuture = new CompletableFuture<>();
+ Policies policies = new Policies();
+ policyFuture.complete(Optional.of(policies));
+ ConfigurationCacheService configurationCacheService = spy(pulsar.getConfigurationCache());
+ when(pulsar.getConfigurationCache()).thenReturn(configurationCacheService);
+ ZooKeeperDataCache<Policies> policiesZooKeeperDataCache = spy(pulsar.getConfigurationCache().policiesCache());
+ when(pulsar.getConfigurationCache().policiesCache()).thenReturn(policiesZooKeeperDataCache);
+ doReturn(policyFuture).when(policiesZooKeeperDataCache).getAsync(any(String.class));
+ AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2, true);
+ verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
+ Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.PRECONDITION_FAILED.getStatusCode());
+ Assert.assertTrue(errCaptor.getValue().getMessage().contains("Namespace does not have any clusters configured"));
+ // Test policy not exist and return 'Namespace not found'
+ CompletableFuture<Optional<Policies>> policyFuture2 = new CompletableFuture<>();
+ policyFuture2.complete(Optional.empty());
+ doReturn(policyFuture2).when(policiesZooKeeperDataCache).getAsync(any(String.class));
+ response = mock(AsyncResponse.class);
+ errCaptor = ArgumentCaptor.forClass(RestException.class);
+ persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2, true);
+ verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
+ Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
+ Assert.assertTrue(errCaptor.getValue().getMessage().contains("Namespace not found"));
+ }
+
@Test(expectedExceptions = RestException.class)
public void testCreateNonPartitionedTopicWithInvalidName() {
final String topicName = "standard-topic-partition-10";