You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/19 00:49:37 UTC

[pulsar] 22/26: [pulsar-function] fix broken backward compatibility with v1-namespace while registering function (#4224)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 20695e92678cb44db264b0d3d4fe903263de922e
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue May 7 16:20:15 2019 -0700

    [pulsar-function] fix broken backward compatibility with v1-namespace while registering function (#4224)
---
 .../pulsar/functions/worker/rest/api/ComponentImpl.java      | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index d844972..bb9a991 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -316,10 +316,14 @@ public abstract class ComponentImpl {
             final TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
 
             String qualifiedNamespace = tenant + "/" + namespace;
-            if (!worker().getBrokerAdmin().namespaces().getNamespaces(tenant).contains(qualifiedNamespace)) {
-                log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace,
-                        componentName, namespace);
-                throw new RestException(Status.BAD_REQUEST, "Namespace does not exist");
+            List<String> namespaces = worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
+            if (namespaces != null && !namespaces.contains(qualifiedNamespace)) {
+                String qualifiedNamespaceWithCluster = String.format("%s/%s/%s", tenant,
+                        worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
+                if (namespaces != null && !namespaces.contains(qualifiedNamespaceWithCluster)) {
+                    log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace, componentName, namespace);
+                    throw new RestException(Status.BAD_REQUEST, "Namespace does not exist");
+                }
             }
         } catch (PulsarAdminException.NotAuthorizedException e) {
             log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace,