You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/22 14:59:52 UTC

[pulsar] branch master updated: Add options to rewrite namespace delimiter for pulsar sql. (#4749)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ddd51f  Add options to rewrite namespace delimiter for pulsar sql. (#4749)
6ddd51f is described below

commit 6ddd51ff45999bd7daa238073e9bcfd87d1df16a
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Jul 22 22:59:43 2019 +0800

    Add options to rewrite namespace delimiter for pulsar sql. (#4749)
    
    ### Motivation
    
    Fix #4732
    
    ### Modifications
    
    Add options to rewrite the namespace delimiter, disable by default
    
    Enable rewrite namespace delimiter can work well with superset:
    <img width="1279" alt="superset" src="https://user-images.githubusercontent.com/12592133/61385412-f0f35700-a8e4-11e9-87b2-a31b62128b58.png">
    
    
    ### Does this pull request potentially affect one of the following parts:
    
    *If `yes` was chosen, please highlight the changes*
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API: (no)
      - The schema: (no)
      - The default values of configurations: (no)
      - The wire protocol: (no)
      - The rest endpoints: (no)
      - The admin cli options: (no)
      - Anything that affects deployment: (no)
    
    ### Documentation
    
      - Does this pull request introduce a new feature? (no)
---
 conf/presto/catalog/pulsar.properties              |  7 +-
 .../apache/pulsar/common/naming/NamedEntity.java   |  2 +-
 .../pulsar/sql/presto/PulsarConnectorConfig.java   | 33 +++++++++
 .../pulsar/sql/presto/PulsarConnectorFactory.java  |  4 +-
 .../pulsar/sql/presto/PulsarConnectorUtils.java    | 14 ++++
 .../apache/pulsar/sql/presto/PulsarMetadata.java   | 27 +++++---
 .../pulsar/sql/presto/PulsarSplitManager.java      | 11 +--
 .../pulsar/sql/presto/TestPulsarConnector.java     | 18 +++++
 .../sql/presto/TestPulsarConnectorConfig.java      | 52 ++++++++++++++
 .../pulsar/sql/presto/TestPulsarMetadata.java      | 81 +++++++++++++---------
 .../pulsar/sql/presto/TestPulsarSplitManager.java  | 23 +++---
 11 files changed, 210 insertions(+), 62 deletions(-)

diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties
index 7f191e5..9387296 100644
--- a/conf/presto/catalog/pulsar.properties
+++ b/conf/presto/catalog/pulsar.properties
@@ -30,7 +30,12 @@ pulsar.target-num-splits=2
 # max message queue size
 pulsar.max-split-message-queue-size=10000
 # max entry queue size
-pulsar.max-split-entry-queue-size = 1000
+pulsar.max-split-entry-queue-size=1000
+# Rewrite namespace delimiter
+# Warn: avoid using symbols allowed by Namespace (a-zA-Z_0-9 -=:%)
+# to prevent erroneous rewriting
+pulsar.namespace-delimiter-rewrite-enable=false
+pulsar.rewrite-namespace-delimiter=/
 
 
 ####### TIERED STORAGE OFFLOADER CONFIGS #######
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamedEntity.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamedEntity.java
index e5180d6..8234f9b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamedEntity.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamedEntity.java
@@ -31,7 +31,7 @@ public class NamedEntity {
     // allowed characters for property, namespace, cluster and topic names are
     // alphanumeric (a-zA-Z_0-9) and these special chars -=:.
     // % is allowed as part of valid URL encoding
-    private static final Pattern NAMED_ENTITY_PATTERN = Pattern.compile("^[-=:.\\w]*$");
+    public static final Pattern NAMED_ENTITY_PATTERN = Pattern.compile("^[-=:.\\w]*$");
 
     public static void checkName(String name) throws IllegalArgumentException {
         Matcher m = NAMED_ENTITY_PATTERN.matcher(name);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index a6b625f..b10c040 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -23,12 +23,14 @@ import io.airlift.configuration.Config;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.pulsar.common.naming.NamedEntity;
 import org.apache.pulsar.common.protocol.Commands;
 
 import javax.validation.constraints.NotNull;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.regex.Matcher;
 
 public class PulsarConnectorConfig implements AutoCloseable {
 
@@ -40,8 +42,12 @@ public class PulsarConnectorConfig implements AutoCloseable {
     private int maxSplitEntryQueueSize = 1000;
     private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
     private String statsProvider = NullStatsProvider.class.getName();
+
     private Map<String, String> statsProviderConfigs = new HashMap<>();
 
+    private boolean namespaceDelimiterRewriteEnable = false;
+    private String rewriteNamespaceDelimiter = "/";
+
     /**** --- Ledger Offloading --- ****/
     private String managedLedgerOffloadDriver = null;
     private int managedLedgerOffloadMaxThreads = 2;
@@ -191,6 +197,33 @@ public class PulsarConnectorConfig implements AutoCloseable {
         return this;
     }
 
+    public String getRewriteNamespaceDelimiter() {
+        return rewriteNamespaceDelimiter;
+    }
+
+    @Config("pulsar.rewrite-namespace-delimiter")
+    public PulsarConnectorConfig setRewriteNamespaceDelimiter(String rewriteNamespaceDelimiter) {
+        Matcher m = NamedEntity.NAMED_ENTITY_PATTERN.matcher(rewriteNamespaceDelimiter);
+        if (m.matches()) {
+            throw new IllegalArgumentException(
+                    "Can't use " + rewriteNamespaceDelimiter + "as delimiter, "
+                    + "because delimiter must contain characters which name of namespace not allowed"
+            );
+        }
+        this.rewriteNamespaceDelimiter = rewriteNamespaceDelimiter;
+        return this;
+    }
+
+    public boolean getNamespaceDelimiterRewriteEnable() {
+        return namespaceDelimiterRewriteEnable;
+    }
+
+    @Config("pulsar.namespace-delimiter-rewrite-enable")
+    public PulsarConnectorConfig setNamespaceDelimiterRewriteEnable(boolean namespaceDelimiterRewriteEnable) {
+        this.namespaceDelimiterRewriteEnable = namespaceDelimiterRewriteEnable;
+        return this;
+    }
+
     @NotNull
     public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
         if (this.pulsarAdmin == null) {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java
index c119a9c..0719e8e 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java
@@ -49,7 +49,9 @@ public class PulsarConnectorFactory implements ConnectorFactory {
     @Override
     public Connector create(String connectorId, Map<String, String> config, ConnectorContext context) {
         requireNonNull(config, "requiredConfig is null");
-        log.debug("Creating Pulsar connector with configs: %s", config);
+        if (log.isDebugEnabled()) {
+            log.debug("Creating Pulsar connector with configs: %s", config);
+        }
         try {
             // A plugin is not required to use Guice; it is just very convenient
             Bootstrap app = new Bootstrap(
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
index ee256f6..b14cb87 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
@@ -83,4 +83,18 @@ public class PulsarConnectorUtils {
         }
         return properties;
     }
+
+
+    public static String rewriteNamespaceDelimiterIfNeeded(String namespace, PulsarConnectorConfig config) {
+        return config.getNamespaceDelimiterRewriteEnable()
+                ? namespace.replace("/", config.getRewriteNamespaceDelimiter())
+                : namespace;
+    }
+
+    public static String restoreNamespaceDelimiterIfNeeded(String namespace, PulsarConnectorConfig config) {
+        return config.getNamespaceDelimiterRewriteEnable()
+                ? namespace.replace(config.getRewriteNamespaceDelimiter(), "/")
+                : namespace;
+    }
+
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index cce44b2..3647c1b 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -66,6 +66,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.Stack;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
@@ -75,11 +76,14 @@ import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
 import static java.util.Objects.requireNonNull;
 import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
 import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
+import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded;
+import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
 
 public class PulsarMetadata implements ConnectorMetadata {
 
     private final String connectorId;
     private final PulsarAdmin pulsarAdmin;
+    private final PulsarConnectorConfig pulsarConnectorConfig;
 
     private static final String INFORMATION_SCHEMA = "information_schema";
 
@@ -88,6 +92,7 @@ public class PulsarMetadata implements ConnectorMetadata {
     @Inject
     public PulsarMetadata(PulsarConnectorId connectorId, PulsarConnectorConfig pulsarConnectorConfig) {
         this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+        this.pulsarConnectorConfig = pulsarConnectorConfig;
         try {
             this.pulsarAdmin = pulsarConnectorConfig.getPulsarAdmin();
         } catch (PulsarClientException e) {
@@ -101,7 +106,8 @@ public class PulsarMetadata implements ConnectorMetadata {
         try {
             List<String> tenants = pulsarAdmin.tenants().getTenants();
             for (String tenant : tenants) {
-                prestoSchemas.addAll(pulsarAdmin.namespaces().getNamespaces(tenant));
+                prestoSchemas.addAll(pulsarAdmin.namespaces().getNamespaces(tenant).stream().map(namespace ->
+                        rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList()));
             }
         } catch (PulsarAdminException e) {
             throw new RuntimeException("Failed to get schemas from pulsar: "
@@ -157,7 +163,7 @@ public class PulsarMetadata implements ConnectorMetadata {
             } else {
                 List<String> pulsarTopicList = null;
                 try {
-                    pulsarTopicList = this.pulsarAdmin.topics().getList(schemaNameOrNull);
+                    pulsarTopicList = this.pulsarAdmin.topics().getList(restoreNamespaceDelimiterIfNeeded(schemaNameOrNull, pulsarConnectorConfig));
                 } catch (PulsarAdminException e) {
                     if (e.getStatusCode() == 404) {
                         log.warn("Schema " + schemaNameOrNull + " does not exsit");
@@ -250,28 +256,29 @@ public class PulsarMetadata implements ConnectorMetadata {
         if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) {
             return null;
         }
+        String namespace = restoreNamespaceDelimiterIfNeeded(schemaTableName.getSchemaName(), pulsarConnectorConfig);
 
         TopicName topicName = TopicName.get(
-                String.format("%s/%s", schemaTableName.getSchemaName(), schemaTableName.getTableName()));
+                String.format("%s/%s", namespace, schemaTableName.getTableName()));
 
         List<String> topics;
         try {
             if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) {
-                topics = this.pulsarAdmin.topics().getList(schemaTableName.getSchemaName());
+                topics = this.pulsarAdmin.topics().getList(namespace);
             } else {
-                topics = this.pulsarAdmin.topics().getPartitionedTopicList((schemaTableName.getSchemaName()));
+                topics = this.pulsarAdmin.topics().getPartitionedTopicList(namespace);
             }
         } catch (PulsarAdminException e) {
             if (e.getStatusCode() == 404) {
-                throw new PrestoException(NOT_FOUND, "Schema " + schemaTableName.getSchemaName() + " does not exist");
+                throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist");
             }
-            throw new RuntimeException("Failed to get topics in schema " + schemaTableName.getSchemaName()
+            throw new RuntimeException("Failed to get topics in schema " + namespace
                     + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
         }
 
         if (!topics.contains(topicName.toString())) {
             log.error("Table %s not found",
-                    String.format("%s/%s", schemaTableName.getSchemaName(),
+                    String.format("%s/%s", namespace,
                             schemaTableName.getTableName()));
             throw new TableNotFoundException(schemaTableName);
         }
@@ -279,14 +286,14 @@ public class PulsarMetadata implements ConnectorMetadata {
         SchemaInfo schemaInfo;
         try {
             schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
-                    String.format("%s/%s", schemaTableName.getSchemaName(), schemaTableName.getTableName()));
+                    String.format("%s/%s", namespace, schemaTableName.getTableName()));
         } catch (PulsarAdminException e) {
             if (e.getStatusCode() == 404) {
                 // to indicate that we can't read from topic because there is no schema
                 return null;
             }
             throw new RuntimeException("Failed to get schema information for topic "
-                    + String.format("%s/%s", schemaTableName.getSchemaName(), schemaTableName.getTableName())
+                    + String.format("%s/%s", namespace, schemaTableName.getTableName())
                     + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
         }
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 445564a..53195a9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -60,6 +60,7 @@ import java.util.List;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries;
+import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
 
 public class PulsarSplitManager implements ConnectorSplitManager {
 
@@ -94,16 +95,18 @@ public class PulsarSplitManager implements ConnectorSplitManager {
         PulsarTableHandle tableHandle = layoutHandle.getTable();
         TupleDomain<ColumnHandle> tupleDomain = layoutHandle.getTupleDomain();
 
-        TopicName topicName = TopicName.get("persistent", NamespaceName.get(tableHandle.getSchemaName()),
+        String namespace = restoreNamespaceDelimiterIfNeeded(tableHandle.getSchemaName(), pulsarConnectorConfig);
+        TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace),
                 tableHandle.getTableName());
 
         SchemaInfo schemaInfo;
+
         try {
             schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
-                    String.format("%s/%s", tableHandle.getSchemaName(), tableHandle.getTableName()));
+                    String.format("%s/%s", namespace, tableHandle.getTableName()));
         } catch (PulsarAdminException e) {
             throw new RuntimeException("Failed to get schema for topic "
-                    + String.format("%s/%s", tableHandle.getSchemaName(), tableHandle.getTableName())
+                    + String.format("%s/%s", namespace, tableHandle.getTableName())
                     + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
         }
 
@@ -258,7 +261,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
                 PositionImpl endPosition = (PositionImpl) readOnlyCursor.getReadPosition();
 
                 splits.add(new PulsarSplit(i, this.connectorId,
-                        tableHandle.getSchemaName(),
+                        restoreNamespaceDelimiterIfNeeded(tableHandle.getSchemaName(), pulsarConnectorConfig),
                         tableName,
                         entriesForSplit,
                         new String(schemaInfo.getSchema()),
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index fe3ac57..6a5cd4f 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -63,6 +64,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import java.time.LocalDate;
@@ -934,4 +936,20 @@ public abstract class TestPulsarConnector {
     public void cleanup() {
         completedBytes = 0L;
     }
+
+    @DataProvider(name = "rewriteNamespaceDelimiter")
+    public static Object[][] serviceUrls() {
+        return new Object[][] {
+                { "|" }, { null }
+        };
+    }
+
+    protected void updateRewriteNamespaceDelimiterIfNeeded(String delimiter) {
+        if (StringUtils.isNotBlank(delimiter)) {
+            pulsarConnectorConfig.setNamespaceDelimiterRewriteEnable(true);
+            pulsarConnectorConfig.setRewriteNamespaceDelimiter(delimiter);
+        } else {
+            pulsarConnectorConfig.setNamespaceDelimiterRewriteEnable(false);
+        }
+    }
 }
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java
new file mode 100644
index 0000000..82b8c97
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestPulsarConnectorConfig {
+
+    @Test
+    public void testDefaultNamespaceDelimiterRewrite() {
+        PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();
+        Assert.assertFalse(connectorConfig.getNamespaceDelimiterRewriteEnable());
+        Assert.assertEquals("/", connectorConfig.getRewriteNamespaceDelimiter());
+    }
+
+    @Test
+    public void testNamespaceRewriteDelimiterRestriction() {
+        PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();
+        try {
+            connectorConfig.setRewriteNamespaceDelimiter("-=:.Az09_");
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof IllegalArgumentException);
+        }
+        connectorConfig.setRewriteNamespaceDelimiter("|");
+        Assert.assertEquals("|", (connectorConfig.getRewriteNamespaceDelimiter()));
+        connectorConfig.setRewriteNamespaceDelimiter("||");
+        Assert.assertEquals("||", (connectorConfig.getRewriteNamespaceDelimiter()));
+        connectorConfig.setRewriteNamespaceDelimiter("$");
+        Assert.assertEquals("$", (connectorConfig.getRewriteNamespaceDelimiter()));
+        connectorConfig.setRewriteNamespaceDelimiter("&");
+        Assert.assertEquals("&", (connectorConfig.getRewriteNamespaceDelimiter()));
+        connectorConfig.setRewriteNamespaceDelimiter("--&");
+        Assert.assertEquals("--&", (connectorConfig.getRewriteNamespaceDelimiter()));
+    }
+}
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index dfd723e..40bf57c 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -29,6 +29,7 @@ import com.facebook.presto.spi.SchemaTablePrefix;
 import com.facebook.presto.spi.TableNotFoundException;
 import io.airlift.log.Logger;
 import org.apache.avro.Schema;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -59,19 +60,29 @@ public class TestPulsarMetadata extends TestPulsarConnector {
 
     private static final Logger log = Logger.get(TestPulsarMetadata.class);
 
-    @Test
-    public void testListSchemaNames() {
-
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testListSchemaNames(String delimiter) {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         List<String> schemas = this.pulsarMetadata.listSchemaNames(mock(ConnectorSession.class));
 
-        String[] expectedSchemas = {NAMESPACE_NAME_1.toString(), NAMESPACE_NAME_2.toString(),
-                NAMESPACE_NAME_3.toString(), NAMESPACE_NAME_4.toString()};
-        assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
-    }
 
-    @Test
-    public void testGetTableHandle() {
+        if (StringUtils.isBlank(delimiter)) {
+            String[] expectedSchemas = {NAMESPACE_NAME_1.toString(), NAMESPACE_NAME_2.toString(),
+                    NAMESPACE_NAME_3.toString(), NAMESPACE_NAME_4.toString()};
+            assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
+        } else {
+            String[] expectedSchemas = {
+                    PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_1.toString(), pulsarConnectorConfig),
+                    PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_2.toString(), pulsarConnectorConfig),
+                    PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_3.toString(), pulsarConnectorConfig),
+                    PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_4.toString(), pulsarConnectorConfig)};
+            assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
+        }
+    }
 
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testGetTableHandle(String delimiter) {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         SchemaTableName schemaTableName = new SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName());
 
         ConnectorTableHandle connectorTableHandle
@@ -87,9 +98,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         assertEquals(pulsarTableHandle.getTopicName(), TOPIC_1.getLocalName());
     }
 
-    @Test
-    public void testGetTableMetadata() {
-
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testGetTableMetadata(String delimiter) {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         List<TopicName> allTopics = new LinkedList<>();
         allTopics.addAll(topicNames);
         allTopics.addAll(partitionedTopicNames);
@@ -131,9 +142,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         }
     }
 
-    @Test
-    public void testGetTableMetadataWrongSchema() {
-
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testGetTableMetadataWrongSchema(String delimiter) {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
                 pulsarConnectorId.toString(),
                 "wrong-tenant/wrong-ns",
@@ -151,9 +162,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         }
     }
 
-    @Test
-    public void testGetTableMetadataWrongTable() {
-
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testGetTableMetadataWrongTable(String delimiter) {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
                 pulsarConnectorId.toString(),
                 TOPIC_1.getNamespace(),
@@ -171,9 +182,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         }
     }
 
-    @Test
-    public void testGetTableMetadataTableNoSchema() throws PulsarAdminException {
-
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testGetTableMetadataTableNoSchema(String delimiter) throws PulsarAdminException {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenThrow(
                 new PulsarAdminException(new ClientErrorException(Response.Status.NOT_FOUND)));
 
@@ -190,9 +201,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         assertEquals(tableMetadata.getColumns().size(), 0);
     }
 
-    @Test
-    public void testGetTableMetadataTableBlankSchema() throws PulsarAdminException {
-
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testGetTableMetadataTableBlankSchema(String delimiter) throws PulsarAdminException {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         SchemaInfo badSchemaInfo = new SchemaInfo();
         badSchemaInfo.setSchema(new byte[0]);
         when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
@@ -215,9 +226,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         }
     }
 
-    @Test
-    public void testGetTableMetadataTableInvalidSchema() throws PulsarAdminException {
-
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testGetTableMetadataTableInvalidSchema(String delimiter) throws PulsarAdminException {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         SchemaInfo badSchemaInfo = new SchemaInfo();
         badSchemaInfo.setSchema("foo".getBytes());
         when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
@@ -240,8 +251,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         }
     }
 
-    @Test
-    public void testListTable() {
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testListTable(String delimiter) {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), null).isEmpty());
         assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), "wrong-tenant/wrong-ns")
                 .isEmpty());
@@ -256,9 +268,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
                 NAMESPACE_NAME_4.toString())), new HashSet<>(Arrays.asList(expectedTopics2)));
     }
 
-    @Test
-    public void testGetColumnHandles() {
-
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testGetColumnHandles(String delimiter) {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(), TOPIC_1.getNamespace(),
                 TOPIC_1.getLocalName(), TOPIC_1.getLocalName());
         Map<String, ColumnHandle> columnHandleMap
@@ -292,8 +304,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         assertTrue(columnHandleMap.isEmpty());
     }
 
-    @Test
-    public void testListTableColumns() {
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testListTableColumns(String delimiter) {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         Map<SchemaTableName, List<ColumnMetadata>> tableColumnsMap
                 = this.pulsarMetadata.listTableColumns(mock(ConnectorSession.class),
                 new SchemaTablePrefix(TOPIC_1.getNamespace()));
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index 253803d..adeaf90 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -66,9 +66,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
         }
     }
 
-    @Test
-    public void testTopic() throws Exception {
-
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testTopic(String delimiter) throws Exception {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         for (TopicName topicName : topicNames) {
             setup();
             log.info("!----- topic: %s -----!", topicName);
@@ -113,8 +113,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
 
     }
 
-    @Test
-    public void testPartitionedTopic() throws Exception {
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testPartitionedTopic(String delimiter) throws Exception {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         for (TopicName topicName : partitionedTopicNames) {
             setup();
             log.info("!----- topic: %s -----!", topicName);
@@ -169,9 +170,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
         }).collect(Collectors.toList());
     }
 
-    @Test
-    public void testPublishTimePredicatePushdown() throws Exception {
-
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testPublishTimePredicatePushdown(String delimiter) throws Exception {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         TopicName topicName = TOPIC_1;
 
         setup();
@@ -226,9 +227,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
 
     }
 
-    @Test
-    public void testPublishTimePredicatePushdownPartitionedTopic() throws Exception {
-
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testPublishTimePredicatePushdownPartitionedTopic(String delimiter) throws Exception {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         TopicName topicName = PARTITIONED_TOPIC_1;
 
         setup();