You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/10 09:14:23 UTC
[pulsar] branch master updated: [pulsar-sql] Handle schema not
found (#4890)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2069f76 [pulsar-sql] Handle schema not found (#4890)
2069f76 is described below
commit 2069f761753940ed6a1faca8999af70036f20fd6
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Aug 10 17:14:19 2019 +0800
[pulsar-sql] Handle schema not found (#4890)
* Handle get schema 404 in pulsar sql(table meta and get splits)
* Fix unit test.
* add defaultSchema()
* use Schema.BYTES.getSchemaInfo()
* add unit test
* rebase and fix unit tests
---
.../apache/pulsar/sql/presto/PulsarMetadata.java | 14 ++++++------
.../pulsar/sql/presto/PulsarSchemaHandlers.java | 5 +++++
.../pulsar/sql/presto/PulsarSplitManager.java | 10 +++++----
.../pulsar/sql/presto/TestPulsarConnector.java | 14 ++++++++++--
.../pulsar/sql/presto/TestPulsarMetadata.java | 6 +++---
.../pulsar/sql/presto/TestPulsarSplitManager.java | 25 +++++++++++++++++++++-
6 files changed, 57 insertions(+), 17 deletions(-)
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index 2ee4a41..dc9a479 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -311,17 +311,17 @@ public class PulsarMetadata implements ConnectorMetadata {
String.format("%s/%s", namespace, schemaTableName.getTableName()));
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
- // to indicate that we can't read from topic because there is no schema
- return null;
+ // use default schema because there is no schema
+ schemaInfo = PulsarSchemaHandlers.defaultSchema();
} else if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get pulsar topic schema information for topic %s/%s: Unauthorized",
namespace, schemaTableName.getTableName()));
+ } else {
+ throw new RuntimeException("Failed to get pulsar topic schema information for topic "
+ + String.format("%s/%s", namespace, schemaTableName.getTableName())
+ + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
-
- throw new RuntimeException("Failed to get pulsar topic schema information for topic "
- + String.format("%s/%s", namespace, schemaTableName.getTableName())
- + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
List<ColumnMetadata> handles = getPulsarColumns(
topicName, schemaInfo, withInternalColumns
@@ -355,7 +355,7 @@ public class PulsarMetadata implements ConnectorMetadata {
ColumnMetadata valueColumn = new PulsarColumnMetadata(
"__value__",
convertPulsarType(schemaInfo.getType()),
- null, null, false, false,
+ "The value of the message with primitive type schema", null, false, false,
new String[0],
new Integer[0]);
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
index 6d304df..c0282df 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
@@ -23,6 +23,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import com.facebook.presto.spi.PrestoException;
import java.util.List;
+
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;
class PulsarSchemaHandlers {
@@ -50,4 +52,7 @@ class PulsarSchemaHandlers {
}
}
+ static SchemaInfo defaultSchema() {
+ return Schema.BYTES.getSchemaInfo();
+ }
}
\ No newline at end of file
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 8531df0..06bc120 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -111,11 +111,13 @@ public class PulsarSplitManager implements ConnectorSplitManager {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get pulsar topic schema for topic %s/%s: Unauthorized",
namespace, tableHandle.getTableName()));
+ } else if (e.getStatusCode() == 404) {
+ schemaInfo = PulsarSchemaHandlers.defaultSchema();
+ } else {
+ throw new RuntimeException("Failed to get pulsar topic schema for topic "
+ + String.format("%s/%s", namespace, tableHandle.getTableName())
+ + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
-
- throw new RuntimeException("Failed to get pulsar topic schema for topic "
- + String.format("%s/%s", namespace, tableHandle.getTableName())
- + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
Collection<PulsarSplit> splits;
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 030876c..cd81442 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -130,6 +130,8 @@ public abstract class TestPulsarConnector {
protected static final TopicName TOPIC_4 = TopicName.get("persistent", NAMESPACE_NAME_3, "topic-1");
protected static final TopicName TOPIC_5 = TopicName.get("persistent", NAMESPACE_NAME_4, "topic-1");
protected static final TopicName TOPIC_6 = TopicName.get("persistent", NAMESPACE_NAME_4, "topic-2");
+ protected static final TopicName NON_SCHEMA_TOPIC = TopicName.get(
+ "persistent", NAMESPACE_NAME_2, "non-schema-topic");
protected static final TopicName PARTITIONED_TOPIC_1 = TopicName.get("persistent", NAMESPACE_NAME_1,
@@ -209,6 +211,7 @@ public abstract class TestPulsarConnector {
topicNames.add(TOPIC_4);
topicNames.add(TOPIC_5);
topicNames.add(TOPIC_6);
+ topicNames.add(NON_SCHEMA_TOPIC);
partitionedTopicNames = new LinkedList<>();
partitionedTopicNames.add(PARTITIONED_TOPIC_1);
@@ -274,6 +277,7 @@ public abstract class TestPulsarConnector {
topicsToNumEntries.put(TOPIC_5.getSchemaName(), 8000L);
topicsToNumEntries.put(TOPIC_6.getSchemaName(), 1L);
+ topicsToNumEntries.put(NON_SCHEMA_TOPIC.getSchemaName(), 8000L);
topicsToNumEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L);
topicsToNumEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L);
topicsToNumEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L);
@@ -550,13 +554,15 @@ public abstract class TestPulsarConnector {
allTopics.addAll(partitionedTopicNames);
for (TopicName topicName : allTopics) {
- splits.put(topicName, new PulsarSplit(0, pulsarConnectorId.toString(),
+ if (topicsToSchemas.containsKey(topicName.getSchemaName())) {
+ splits.put(topicName, new PulsarSplit(0, pulsarConnectorId.toString(),
topicName.getNamespace(), topicName.getLocalName(),
topicsToNumEntries.get(topicName.getSchemaName()),
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()),
topicsToSchemas.get(topicName.getSchemaName()).getType(),
0, topicsToNumEntries.get(topicName.getSchemaName()),
0, 0, TupleDomain.all(), new HashMap<>()));
+ }
}
fooFunctions = new HashMap<>();
@@ -742,7 +748,11 @@ public abstract class TestPulsarConnector {
public SchemaInfo answer(InvocationOnMock invocationOnMock) throws Throwable {
Object[] args = invocationOnMock.getArguments();
String topic = (String) args[0];
- return topicsToSchemas.get(topic);
+ if (topicsToSchemas.get(topic) != null) {
+ return topicsToSchemas.get(topic);
+ } else {
+ throw new PulsarAdminException(new ClientErrorException(Response.status(404).build()));
+ }
}
});
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index 2a1a58c..d39e403 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -45,6 +45,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
@@ -104,7 +105,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
public void testGetTableMetadata(String delimiter) {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
List<TopicName> allTopics = new LinkedList<>();
- allTopics.addAll(topicNames);
+ allTopics.addAll(topicNames.stream().filter(topicName -> !topicName.equals(NON_SCHEMA_TOPIC)).collect(Collectors.toList()));
allTopics.addAll(partitionedTopicNames);
for (TopicName topic : allTopics) {
@@ -120,7 +121,6 @@ public class TestPulsarMetadata extends TestPulsarConnector {
assertEquals(tableMetadata.getTable().getSchemaName(), topic.getNamespace());
assertEquals(tableMetadata.getTable().getTableName(), topic.getLocalName());
-
assertEquals(tableMetadata.getColumns().size(),
fooColumnHandles.size());
@@ -200,7 +200,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
pulsarTableHandle);
- assertEquals(tableMetadata.getColumns().size(), 0);
+ assertEquals(tableMetadata.getColumns().size(), PulsarInternalColumn.getInternalFields().size() + 1);
}
@Test(dataProvider = "rewriteNamespaceDelimiter")
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index e442522..ef36148 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -36,6 +36,7 @@ import org.testng.annotations.Test;
import java.util.Collection;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -51,6 +52,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertNotNull;
@Test(singleThreaded = true)
public class TestPulsarSplitManager extends TestPulsarConnector {
@@ -73,7 +75,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
@Test(dataProvider = "rewriteNamespaceDelimiter")
public void testTopic(String delimiter) throws Exception {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
- for (TopicName topicName : topicNames) {
+ List<TopicName> topics = new LinkedList<>();
+ topics.addAll(topicNames.stream().filter(topicName -> !topicName.equals(NON_SCHEMA_TOPIC)).collect(Collectors.toList()));
+ for (TopicName topicName : topics) {
setup();
log.info("!----- topic: %s -----!", topicName);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(),
@@ -378,5 +382,24 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
}
+ @Test(dataProvider = "rewriteNamespaceDelimiter")
+ public void testGetSplitNonSchema(String delimiter) throws Exception {
+ updateRewriteNamespaceDelimiterIfNeeded(delimiter);
+ TopicName topicName = NON_SCHEMA_TOPIC;
+ setup();
+ log.info("!----- topic: %s -----!", topicName);
+ PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(),
+ topicName.getNamespace(),
+ topicName.getLocalName(),
+ topicName.getLocalName());
+ Map<ColumnHandle, Domain> domainMap = new HashMap<>();
+ TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);
+
+ PulsarTableLayoutHandle pulsarTableLayoutHandle = new PulsarTableLayoutHandle(pulsarTableHandle, tupleDomain);
+ ConnectorSplitSource connectorSplitSource = this.pulsarSplitManager.getSplits(
+ mock(ConnectorTransactionHandle.class), mock(ConnectorSession.class),
+ pulsarTableLayoutHandle, null);
+ assertNotNull(connectorSplitSource);
+ }
}