You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/22 14:59:52 UTC
[pulsar] branch master updated: Add options to rewrite namespace
delimiter for pulsar sql. (#4749)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6ddd51f Add options to rewrite namespace delimiter for pulsar sql. (#4749)
6ddd51f is described below
commit 6ddd51ff45999bd7daa238073e9bcfd87d1df16a
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Jul 22 22:59:43 2019 +0800
Add options to rewrite namespace delimiter for pulsar sql. (#4749)
### Motivation
Fix #4732
### Modifications
Add options to rewrite the namespace delimiter, disable by default
Enable rewrite namespace delimiter can work well with superset:
<img width="1279" alt="superset" src="https://user-images.githubusercontent.com/12592133/61385412-f0f35700-a8e4-11e9-87b2-a31b62128b58.png">
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
- The rest endpoints: (no)
- The admin cli options: (no)
- Anything that affects deployment: (no)
### Documentation
- Does this pull request introduce a new feature? (no)
---
conf/presto/catalog/pulsar.properties | 7 +-
.../apache/pulsar/common/naming/NamedEntity.java | 2 +-
.../pulsar/sql/presto/PulsarConnectorConfig.java | 33 +++++++++
.../pulsar/sql/presto/PulsarConnectorFactory.java | 4 +-
.../pulsar/sql/presto/PulsarConnectorUtils.java | 14 ++++
.../apache/pulsar/sql/presto/PulsarMetadata.java | 27 +++++---
.../pulsar/sql/presto/PulsarSplitManager.java | 11 +--
.../pulsar/sql/presto/TestPulsarConnector.java | 18 +++++
.../sql/presto/TestPulsarConnectorConfig.java | 52 ++++++++++++++
.../pulsar/sql/presto/TestPulsarMetadata.java | 81 +++++++++++++---------
.../pulsar/sql/presto/TestPulsarSplitManager.java | 23 +++---
11 files changed, 210 insertions(+), 62 deletions(-)
diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties
index 7f191e5..9387296 100644
--- a/conf/presto/catalog/pulsar.properties
+++ b/conf/presto/catalog/pulsar.properties
@@ -30,7 +30,12 @@ pulsar.target-num-splits=2
# max message queue size
pulsar.max-split-message-queue-size=10000
# max entry queue size
-pulsar.max-split-entry-queue-size = 1000
+pulsar.max-split-entry-queue-size=1000
+# Rewrite namespace delimiter
+# Warn: avoid using symbols allowed by Namespace (a-zA-Z_0-9 -=:%)
+# to prevent erroneous rewriting
+pulsar.namespace-delimiter-rewrite-enable=false
+pulsar.rewrite-namespace-delimiter=/
####### TIERED STORAGE OFFLOADER CONFIGS #######
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamedEntity.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamedEntity.java
index e5180d6..8234f9b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamedEntity.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamedEntity.java
@@ -31,7 +31,7 @@ public class NamedEntity {
// allowed characters for property, namespace, cluster and topic names are
// alphanumeric (a-zA-Z_0-9) and these special chars -=:.
// % is allowed as part of valid URL encoding
- private static final Pattern NAMED_ENTITY_PATTERN = Pattern.compile("^[-=:.\\w]*$");
+ public static final Pattern NAMED_ENTITY_PATTERN = Pattern.compile("^[-=:.\\w]*$");
public static void checkName(String name) throws IllegalArgumentException {
Matcher m = NAMED_ENTITY_PATTERN.matcher(name);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index a6b625f..b10c040 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -23,12 +23,14 @@ import io.airlift.configuration.Config;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.bookkeeper.stats.NullStatsProvider;
+import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.protocol.Commands;
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.regex.Matcher;
public class PulsarConnectorConfig implements AutoCloseable {
@@ -40,8 +42,12 @@ public class PulsarConnectorConfig implements AutoCloseable {
private int maxSplitEntryQueueSize = 1000;
private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
private String statsProvider = NullStatsProvider.class.getName();
+
private Map<String, String> statsProviderConfigs = new HashMap<>();
+ private boolean namespaceDelimiterRewriteEnable = false;
+ private String rewriteNamespaceDelimiter = "/";
+
/**** --- Ledger Offloading --- ****/
private String managedLedgerOffloadDriver = null;
private int managedLedgerOffloadMaxThreads = 2;
@@ -191,6 +197,33 @@ public class PulsarConnectorConfig implements AutoCloseable {
return this;
}
+ public String getRewriteNamespaceDelimiter() {
+ return rewriteNamespaceDelimiter;
+ }
+
+ @Config("pulsar.rewrite-namespace-delimiter")
+ public PulsarConnectorConfig setRewriteNamespaceDelimiter(String rewriteNamespaceDelimiter) {
+ Matcher m = NamedEntity.NAMED_ENTITY_PATTERN.matcher(rewriteNamespaceDelimiter);
+ if (m.matches()) {
+ throw new IllegalArgumentException(
+ "Can't use " + rewriteNamespaceDelimiter + "as delimiter, "
+ + "because delimiter must contain characters which name of namespace not allowed"
+ );
+ }
+ this.rewriteNamespaceDelimiter = rewriteNamespaceDelimiter;
+ return this;
+ }
+
+ public boolean getNamespaceDelimiterRewriteEnable() {
+ return namespaceDelimiterRewriteEnable;
+ }
+
+ @Config("pulsar.namespace-delimiter-rewrite-enable")
+ public PulsarConnectorConfig setNamespaceDelimiterRewriteEnable(boolean namespaceDelimiterRewriteEnable) {
+ this.namespaceDelimiterRewriteEnable = namespaceDelimiterRewriteEnable;
+ return this;
+ }
+
@NotNull
public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
if (this.pulsarAdmin == null) {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java
index c119a9c..0719e8e 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java
@@ -49,7 +49,9 @@ public class PulsarConnectorFactory implements ConnectorFactory {
@Override
public Connector create(String connectorId, Map<String, String> config, ConnectorContext context) {
requireNonNull(config, "requiredConfig is null");
- log.debug("Creating Pulsar connector with configs: %s", config);
+ if (log.isDebugEnabled()) {
+ log.debug("Creating Pulsar connector with configs: %s", config);
+ }
try {
// A plugin is not required to use Guice; it is just very convenient
Bootstrap app = new Bootstrap(
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
index ee256f6..b14cb87 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
@@ -83,4 +83,18 @@ public class PulsarConnectorUtils {
}
return properties;
}
+
+
+ public static String rewriteNamespaceDelimiterIfNeeded(String namespace, PulsarConnectorConfig config) {
+ return config.getNamespaceDelimiterRewriteEnable()
+ ? namespace.replace("/", config.getRewriteNamespaceDelimiter())
+ : namespace;
+ }
+
+ public static String restoreNamespaceDelimiterIfNeeded(String namespace, PulsarConnectorConfig config) {
+ return config.getNamespaceDelimiterRewriteEnable()
+ ? namespace.replace(config.getRewriteNamespaceDelimiter(), "/")
+ : namespace;
+ }
+
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index cce44b2..3647c1b 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -66,6 +66,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.Stack;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
@@ -75,11 +76,14 @@ import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static java.util.Objects.requireNonNull;
import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
+import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded;
+import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
public class PulsarMetadata implements ConnectorMetadata {
private final String connectorId;
private final PulsarAdmin pulsarAdmin;
+ private final PulsarConnectorConfig pulsarConnectorConfig;
private static final String INFORMATION_SCHEMA = "information_schema";
@@ -88,6 +92,7 @@ public class PulsarMetadata implements ConnectorMetadata {
@Inject
public PulsarMetadata(PulsarConnectorId connectorId, PulsarConnectorConfig pulsarConnectorConfig) {
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+ this.pulsarConnectorConfig = pulsarConnectorConfig;
try {
this.pulsarAdmin = pulsarConnectorConfig.getPulsarAdmin();
} catch (PulsarClientException e) {
@@ -101,7 +106,8 @@ public class PulsarMetadata implements ConnectorMetadata {
try {
List<String> tenants = pulsarAdmin.tenants().getTenants();
for (String tenant : tenants) {
- prestoSchemas.addAll(pulsarAdmin.namespaces().getNamespaces(tenant));
+ prestoSchemas.addAll(pulsarAdmin.namespaces().getNamespaces(tenant).stream().map(namespace ->
+ rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList()));
}
} catch (PulsarAdminException e) {
throw new RuntimeException("Failed to get schemas from pulsar: "
@@ -157,7 +163,7 @@ public class PulsarMetadata implements ConnectorMetadata {
} else {
List<String> pulsarTopicList = null;
try {
- pulsarTopicList = this.pulsarAdmin.topics().getList(schemaNameOrNull);
+ pulsarTopicList = this.pulsarAdmin.topics().getList(restoreNamespaceDelimiterIfNeeded(schemaNameOrNull, pulsarConnectorConfig));
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
log.warn("Schema " + schemaNameOrNull + " does not exsit");
@@ -250,28 +256,29 @@ public class PulsarMetadata implements ConnectorMetadata {
if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) {
return null;
}
+ String namespace = restoreNamespaceDelimiterIfNeeded(schemaTableName.getSchemaName(), pulsarConnectorConfig);
TopicName topicName = TopicName.get(
- String.format("%s/%s", schemaTableName.getSchemaName(), schemaTableName.getTableName()));
+ String.format("%s/%s", namespace, schemaTableName.getTableName()));
List<String> topics;
try {
if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) {
- topics = this.pulsarAdmin.topics().getList(schemaTableName.getSchemaName());
+ topics = this.pulsarAdmin.topics().getList(namespace);
} else {
- topics = this.pulsarAdmin.topics().getPartitionedTopicList((schemaTableName.getSchemaName()));
+ topics = this.pulsarAdmin.topics().getPartitionedTopicList(namespace);
}
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
- throw new PrestoException(NOT_FOUND, "Schema " + schemaTableName.getSchemaName() + " does not exist");
+ throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist");
}
- throw new RuntimeException("Failed to get topics in schema " + schemaTableName.getSchemaName()
+ throw new RuntimeException("Failed to get topics in schema " + namespace
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
if (!topics.contains(topicName.toString())) {
log.error("Table %s not found",
- String.format("%s/%s", schemaTableName.getSchemaName(),
+ String.format("%s/%s", namespace,
schemaTableName.getTableName()));
throw new TableNotFoundException(schemaTableName);
}
@@ -279,14 +286,14 @@ public class PulsarMetadata implements ConnectorMetadata {
SchemaInfo schemaInfo;
try {
schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
- String.format("%s/%s", schemaTableName.getSchemaName(), schemaTableName.getTableName()));
+ String.format("%s/%s", namespace, schemaTableName.getTableName()));
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
// to indicate that we can't read from topic because there is no schema
return null;
}
throw new RuntimeException("Failed to get schema information for topic "
- + String.format("%s/%s", schemaTableName.getSchemaName(), schemaTableName.getTableName())
+ + String.format("%s/%s", namespace, schemaTableName.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 445564a..53195a9 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -60,6 +60,7 @@ import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries;
+import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
public class PulsarSplitManager implements ConnectorSplitManager {
@@ -94,16 +95,18 @@ public class PulsarSplitManager implements ConnectorSplitManager {
PulsarTableHandle tableHandle = layoutHandle.getTable();
TupleDomain<ColumnHandle> tupleDomain = layoutHandle.getTupleDomain();
- TopicName topicName = TopicName.get("persistent", NamespaceName.get(tableHandle.getSchemaName()),
+ String namespace = restoreNamespaceDelimiterIfNeeded(tableHandle.getSchemaName(), pulsarConnectorConfig);
+ TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace),
tableHandle.getTableName());
SchemaInfo schemaInfo;
+
try {
schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
- String.format("%s/%s", tableHandle.getSchemaName(), tableHandle.getTableName()));
+ String.format("%s/%s", namespace, tableHandle.getTableName()));
} catch (PulsarAdminException e) {
throw new RuntimeException("Failed to get schema for topic "
- + String.format("%s/%s", tableHandle.getSchemaName(), tableHandle.getTableName())
+ + String.format("%s/%s", namespace, tableHandle.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
@@ -258,7 +261,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
PositionImpl endPosition = (PositionImpl) readOnlyCursor.getReadPosition();
splits.add(new PulsarSplit(i, this.connectorId,
- tableHandle.getSchemaName(),
+ restoreNamespaceDelimiterIfNeeded(tableHandle.getSchemaName(), pulsarConnectorConfig),
tableName,
entriesForSplit,
new String(schemaInfo.getSchema()),
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index fe3ac57..6a5cd4f 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -63,6 +64,7 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.time.LocalDate;
@@ -934,4 +936,20 @@ public abstract class TestPulsarConnector {
public void cleanup() {
completedBytes = 0L;
}
+
+ @DataProvider(name = "rewriteNamespaceDelimiter")
+ public static Object[][] serviceUrls() {
+ return new Object[][] {
+ { "|" }, { null }
+ };
+ }
+
+ protected void updateRewriteNamespaceDelimiterIfNeeded(String delimiter) {
+ if (StringUtils.isNotBlank(delimiter)) {
+ pulsarConnectorConfig.setNamespaceDelimiterRewriteEnable(true);
+ pulsarConnectorConfig.setRewriteNamespaceDelimiter(delimiter);
+ } else {
+ pulsarConnectorConfig.setNamespaceDelimiterRewriteEnable(false);
+ }
+ }
}
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java
new file mode 100644
index 0000000..82b8c97
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestPulsarConnectorConfig {
+
+ @Test
+ public void testDefaultNamespaceDelimiterRewrite() {
+ PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();
+ Assert.assertFalse(connectorConfig.getNamespaceDelimiterRewriteEnable());
+ Assert.assertEquals("/", connectorConfig.getRewriteNamespaceDelimiter());
+ }
+
+ @Test
+ public void testNamespaceRewriteDelimiterRestriction() {
+ PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();
+ try {
+ connectorConfig.setRewriteNamespaceDelimiter("-=:.Az09_");
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof IllegalArgumentException);
+ }
+ connectorConfig.setRewriteNamespaceDelimiter("|");
+ Assert.assertEquals("|", (connectorConfig.getRewriteNamespaceDelimiter()));
+ connectorConfig.setRewriteNamespaceDelimiter("||");
+ Assert.assertEquals("||", (connectorConfig.getRewriteNamespaceDelimiter()));
+ connectorConfig.setRewriteNamespaceDelimiter("$");
+ Assert.assertEquals("$", (connectorConfig.getRewriteNamespaceDelimiter()));
+ connectorConfig.setRewriteNamespaceDelimiter("&");
+ Assert.assertEquals("&", (connectorConfig.getRewriteNamespaceDelimiter()));
+ connectorConfig.setRewriteNamespaceDelimiter("--&");
+ Assert.assertEquals("--&", (connectorConfig.getRewriteNamespaceDelimiter()));
+ }
+}
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index dfd723e..40bf57c 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -29,6 +29,7 @@ import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.TableNotFoundException;
import io.airlift.log.Logger;
import org.apache.avro.Schema;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -59,19 +60,29 @@ public class TestPulsarMetadata extends TestPulsarConnector {
private static final Logger log = Logger.get(TestPulsarMetadata.class);
- @Test
- public void testListSchemaNames() {
-
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testListSchemaNames(String delimiter) {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
List<String> schemas = this.pulsarMetadata.listSchemaNames(mock(ConnectorSession.class));
- String[] expectedSchemas = {NAMESPACE_NAME_1.toString(), NAMESPACE_NAME_2.toString(),
- NAMESPACE_NAME_3.toString(), NAMESPACE_NAME_4.toString()};
- assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
- }
- @Test
- public void testGetTableHandle() {
+ if (StringUtils.isBlank(delimiter)) {
+ String[] expectedSchemas = {NAMESPACE_NAME_1.toString(), NAMESPACE_NAME_2.toString(),
+ NAMESPACE_NAME_3.toString(), NAMESPACE_NAME_4.toString()};
+ assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
+ } else {
+ String[] expectedSchemas = {
+ PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_1.toString(), pulsarConnectorConfig),
+ PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_2.toString(), pulsarConnectorConfig),
+ PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_3.toString(), pulsarConnectorConfig),
+ PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_4.toString(), pulsarConnectorConfig)};
+ assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
+ }
+ }
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testGetTableHandle(String delimiter) {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
SchemaTableName schemaTableName = new SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName());
ConnectorTableHandle connectorTableHandle
@@ -87,9 +98,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
assertEquals(pulsarTableHandle.getTopicName(), TOPIC_1.getLocalName());
}
- @Test
- public void testGetTableMetadata() {
-
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testGetTableMetadata(String delimiter) {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
List<TopicName> allTopics = new LinkedList<>();
allTopics.addAll(topicNames);
allTopics.addAll(partitionedTopicNames);
@@ -131,9 +142,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
}
}
- @Test
- public void testGetTableMetadataWrongSchema() {
-
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testGetTableMetadataWrongSchema(String delimiter) {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
pulsarConnectorId.toString(),
"wrong-tenant/wrong-ns",
@@ -151,9 +162,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
}
}
- @Test
- public void testGetTableMetadataWrongTable() {
-
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testGetTableMetadataWrongTable(String delimiter) {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
pulsarConnectorId.toString(),
TOPIC_1.getNamespace(),
@@ -171,9 +182,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
}
}
- @Test
- public void testGetTableMetadataTableNoSchema() throws PulsarAdminException {
-
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testGetTableMetadataTableNoSchema(String delimiter) throws PulsarAdminException {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenThrow(
new PulsarAdminException(new ClientErrorException(Response.Status.NOT_FOUND)));
@@ -190,9 +201,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
assertEquals(tableMetadata.getColumns().size(), 0);
}
- @Test
- public void testGetTableMetadataTableBlankSchema() throws PulsarAdminException {
-
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testGetTableMetadataTableBlankSchema(String delimiter) throws PulsarAdminException {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
SchemaInfo badSchemaInfo = new SchemaInfo();
badSchemaInfo.setSchema(new byte[0]);
when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
@@ -215,9 +226,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
}
}
- @Test
- public void testGetTableMetadataTableInvalidSchema() throws PulsarAdminException {
-
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testGetTableMetadataTableInvalidSchema(String delimiter) throws PulsarAdminException {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
SchemaInfo badSchemaInfo = new SchemaInfo();
badSchemaInfo.setSchema("foo".getBytes());
when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
@@ -240,8 +251,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
}
}
- @Test
- public void testListTable() {
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testListTable(String delimiter) {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), null).isEmpty());
assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), "wrong-tenant/wrong-ns")
.isEmpty());
@@ -256,9 +268,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
NAMESPACE_NAME_4.toString())), new HashSet<>(Arrays.asList(expectedTopics2)));
}
- @Test
- public void testGetColumnHandles() {
-
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testGetColumnHandles(String delimiter) {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(), TOPIC_1.getNamespace(),
TOPIC_1.getLocalName(), TOPIC_1.getLocalName());
Map<String, ColumnHandle> columnHandleMap
@@ -292,8 +304,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
assertTrue(columnHandleMap.isEmpty());
}
- @Test
- public void testListTableColumns() {
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testListTableColumns(String delimiter) {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
Map<SchemaTableName, List<ColumnMetadata>> tableColumnsMap
= this.pulsarMetadata.listTableColumns(mock(ConnectorSession.class),
new SchemaTablePrefix(TOPIC_1.getNamespace()));
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index 253803d..adeaf90 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -66,9 +66,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
}
}
- @Test
- public void testTopic() throws Exception {
-
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testTopic(String delimiter) throws Exception {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
for (TopicName topicName : topicNames) {
setup();
log.info("!----- topic: %s -----!", topicName);
@@ -113,8 +113,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
}
- @Test
- public void testPartitionedTopic() throws Exception {
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testPartitionedTopic(String delimiter) throws Exception {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
for (TopicName topicName : partitionedTopicNames) {
setup();
log.info("!----- topic: %s -----!", topicName);
@@ -169,9 +170,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
}).collect(Collectors.toList());
}
- @Test
- public void testPublishTimePredicatePushdown() throws Exception {
-
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testPublishTimePredicatePushdown(String delimiter) throws Exception {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
TopicName topicName = TOPIC_1;
setup();
@@ -226,9 +227,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
}
- @Test
- public void testPublishTimePredicatePushdownPartitionedTopic() throws Exception {
-
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testPublishTimePredicatePushdownPartitionedTopic(String delimiter) throws Exception {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
TopicName topicName = PARTITIONED_TOPIC_1;
setup();