You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/07 02:58:59 UTC

[pulsar] branch branch-2.7 updated: fix get partition metadata problem for a non-existed topic (#8818)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new bb8684d  fix get partition metadata problem for a non-existed topic (#8818)
bb8684d is described below

commit bb8684d8febba8b4f7ddb1472407fad48674aea8
Author: Aloys <lo...@gmail.com>
AuthorDate: Thu Jan 7 09:14:06 2021 +0800

    fix get partition metadata problem for a non-existed topic (#8818)
    
    Fixes #8813
    
    Currently, getting the partition metadata for a non-existed topic, it returns 0 instead of throwing an exception.
    This pr fix this by  throwing an exception.
    
    If no metadata found in global zk, then will check whether the topic is exist, if yes, will return 0, otherwise will  throw an exception.
    
    (cherry picked from commit a3dfb2a40979a2b9087b078e78044b91be558a76)
---
 .../apache/pulsar/broker/admin/AdminResource.java   | 21 +++++++++++++++++++++
 .../pulsar/broker/admin/v2/PersistentTopics.java    |  1 +
 .../org/apache/pulsar/broker/web/RestException.java | 11 +++++++++++
 .../apache/pulsar/broker/admin/AdminApiTest.java    |  1 -
 .../pulsar/broker/admin/PersistentTopicsTest.java   |  8 ++++++++
 .../pulsar/client/api/PartitionCreationTest.java    |  2 +-
 .../pulsar/client/api/PulsarClientException.java    |  2 ++
 7 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 1be8d75..02181e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -373,6 +374,26 @@ public abstract class AdminResource extends PulsarWebResource {
         }
     }
 
+    protected void validateTopicExistedAndCheckAllowAutoCreation(String tenant, String namespace,
+                                                                 String encodedTopic, boolean checkAllowAutoCreation) {
+        try {
+            PartitionedTopicMetadata partitionedTopicMetadata =
+                    pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
+            if (partitionedTopicMetadata.partitions < 1) {
+                if (!pulsar().getNamespaceService().checkTopicExists(topicName).get()
+                        && checkAllowAutoCreation
+                        && !pulsar().getBrokerService().isAllowAutoTopicCreation(topicName)) {
+                    throw new RestException(Status.NOT_FOUND,
+                            new PulsarClientException.NotFoundException("Topic not exist"));
+                }
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            log.error("Failed to validate topic existed {}://{}/{}/{}",
+                    domain(), tenant, namespace, topicName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, "Check topic partition meta failed.");
+        }
+    }
+
     @Deprecated
     protected void validateTopicName(String property, String cluster, String namespace, String encodedTopic) {
         String topic = Codec.decode(encodedTopic);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index d26b9eb..18ab1b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -740,6 +740,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Is check configuration required to automatically create topic")
             @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
         validateTopicName(tenant, namespace, encodedTopic);
+        validateTopicExistedAndCheckAllowAutoCreation(tenant, namespace, encodedTopic, checkAllowAutoCreation);
         return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
index f552333..67d4add 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestException.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.common.policies.data.ErrorData;
  */
 @SuppressWarnings("serial")
 public class RestException extends WebApplicationException {
+    private Throwable cause = null;
     static String getExceptionData(Throwable t) {
         StringWriter writer = new StringWriter();
         writer.append("\n --- An unexpected error occurred in the server ---\n\n");
@@ -58,6 +59,16 @@ public class RestException extends WebApplicationException {
         super(getResponse(t));
     }
 
+    public RestException(Response.Status status, Throwable t) {
+        this(status.getStatusCode(), t.getMessage());
+        this.cause = t;
+    }
+
+    @Override
+    public Throwable getCause() {
+        return cause;
+    }
+
     public RestException(PulsarAdminException cae) {
         this(cae.getStatusCode(), cae.getHttpError());
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 67d2117..930c975 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -850,7 +850,6 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
         assertEquals(admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/ns1/ds2").partitions,
                 0);
-
         // check the getPartitionedStats for PartitionedTopic returns only partitions metadata, and no partitions info
         assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
                 admin.topics().getPartitionedStats(partitionedTopicName,false).metadata.partitions);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 67bf32b..0c787ee 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -267,6 +267,10 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         Assert.assertEquals(persistentTopics
                         .getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true, false).partitions,
                 0);
+
+        Assert.assertEquals(persistentTopics
+                        .getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true, true).partitions,
+                0);
     }
 
     @Test
@@ -276,6 +280,10 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         PartitionedTopicMetadata pMetadata = persistentTopics.getPartitionedMetadata(
                 testTenant, testNamespace, topicName, true, false);
         Assert.assertEquals(pMetadata.partitions, 0);
+
+        PartitionedTopicMetadata metadata = persistentTopics.getPartitionedMetadata(
+                testTenant, testNamespace, topicName, true, true);
+        Assert.assertEquals(metadata.partitions, 0);
     }
 
     @Test(expectedExceptions = RestException.class)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
index dfdb25f..f223272 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java
@@ -83,7 +83,7 @@ public class PartitionCreationTest extends ProducerConsumerBase {
                 // passed non persistent topic here since we can not avoid auto creation on non persistent topic now.
                 Assert.assertNotNull(consumer);
             }
-        } catch (PulsarClientException.TopicDoesNotExistException e) {
+        } catch (PulsarClientException.TopicDoesNotExistException | PulsarClientException.NotFoundException e) {
             //ok
         }
     }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index f49ea52..e7b6640 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -972,6 +972,8 @@ public class PulsarClientException extends IOException {
             return new TransactionConflictException(msg);
         } else if (cause instanceof TopicDoesNotExistException) {
             return new TopicDoesNotExistException(msg);
+        } else if (cause instanceof NotFoundException) {
+            return new NotFoundException(msg);
         } else {
             return new PulsarClientException(t);
         }