You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/10 09:14:23 UTC

[pulsar] branch master updated: [pulsar-sql] Handle schema not found (#4890)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2069f76  [pulsar-sql] Handle schema not found (#4890)
2069f76 is described below

commit 2069f761753940ed6a1faca8999af70036f20fd6
Author: lipenghui <pe...@apache.org>
AuthorDate: Sat Aug 10 17:14:19 2019 +0800

    [pulsar-sql] Handle schema not found (#4890)
    
    * Handle get schema 404 in pulsar sql(table meta and get splits)
    
    * Fix unit test.
    
    * add defaultSchema()
    
    * use Schema.BYTES.getSchemaInfo()
    
    * add unit test
    
    * rebase and fix unit tests
---
 .../apache/pulsar/sql/presto/PulsarMetadata.java   | 14 ++++++------
 .../pulsar/sql/presto/PulsarSchemaHandlers.java    |  5 +++++
 .../pulsar/sql/presto/PulsarSplitManager.java      | 10 +++++----
 .../pulsar/sql/presto/TestPulsarConnector.java     | 14 ++++++++++--
 .../pulsar/sql/presto/TestPulsarMetadata.java      |  6 +++---
 .../pulsar/sql/presto/TestPulsarSplitManager.java  | 25 +++++++++++++++++++++-
 6 files changed, 57 insertions(+), 17 deletions(-)

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index 2ee4a41..dc9a479 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -311,17 +311,17 @@ public class PulsarMetadata implements ConnectorMetadata {
                     String.format("%s/%s", namespace, schemaTableName.getTableName()));
         } catch (PulsarAdminException e) {
             if (e.getStatusCode() == 404) {
-                // to indicate that we can't read from topic because there is no schema
-                return null;
+                // use default schema because there is no schema
+                schemaInfo = PulsarSchemaHandlers.defaultSchema();
             } else if (e.getStatusCode() == 401) {
                 throw new PrestoException(QUERY_REJECTED,
                         String.format("Failed to get pulsar topic schema information for topic %s/%s: Unauthorized",
                                 namespace, schemaTableName.getTableName()));
+            } else {
+                throw new RuntimeException("Failed to get pulsar topic schema information for topic "
+                        + String.format("%s/%s", namespace, schemaTableName.getTableName())
+                        + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
             }
-
-            throw new RuntimeException("Failed to get pulsar topic schema information for topic "
-                    + String.format("%s/%s", namespace, schemaTableName.getTableName())
-                    + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
         }
         List<ColumnMetadata> handles = getPulsarColumns(
                 topicName, schemaInfo, withInternalColumns
@@ -355,7 +355,7 @@ public class PulsarMetadata implements ConnectorMetadata {
         ColumnMetadata valueColumn = new PulsarColumnMetadata(
                 "__value__",
                 convertPulsarType(schemaInfo.getType()),
-                null, null, false, false,
+                "The value of the message with primitive type schema", null, false, false,
                 new String[0],
                 new Integer[0]);
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
index 6d304df..c0282df 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
@@ -23,6 +23,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 
 import com.facebook.presto.spi.PrestoException;
 import java.util.List;
+
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 class PulsarSchemaHandlers {
@@ -50,4 +52,7 @@ class PulsarSchemaHandlers {
         }
     }
 
+    static SchemaInfo defaultSchema() {
+        return Schema.BYTES.getSchemaInfo();
+    }
 }
\ No newline at end of file
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 8531df0..06bc120 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -111,11 +111,13 @@ public class PulsarSplitManager implements ConnectorSplitManager {
                 throw new PrestoException(QUERY_REJECTED,
                         String.format("Failed to get pulsar topic schema for topic %s/%s: Unauthorized",
                                 namespace, tableHandle.getTableName()));
+            } else if (e.getStatusCode() == 404) {
+                schemaInfo = PulsarSchemaHandlers.defaultSchema();
+            } else {
+                throw new RuntimeException("Failed to get pulsar topic schema for topic "
+                        + String.format("%s/%s", namespace, tableHandle.getTableName())
+                        + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
             }
-
-            throw new RuntimeException("Failed to get pulsar topic schema for topic "
-                    + String.format("%s/%s", namespace, tableHandle.getTableName())
-                    + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
         }
 
         Collection<PulsarSplit> splits;
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 030876c..cd81442 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -130,6 +130,8 @@ public abstract class TestPulsarConnector {
     protected static final TopicName TOPIC_4 = TopicName.get("persistent", NAMESPACE_NAME_3, "topic-1");
     protected static final TopicName TOPIC_5 = TopicName.get("persistent", NAMESPACE_NAME_4, "topic-1");
     protected static final TopicName TOPIC_6 = TopicName.get("persistent", NAMESPACE_NAME_4, "topic-2");
+    protected static final TopicName NON_SCHEMA_TOPIC = TopicName.get(
+        "persistent", NAMESPACE_NAME_2, "non-schema-topic");
 
 
     protected static final TopicName PARTITIONED_TOPIC_1 = TopicName.get("persistent", NAMESPACE_NAME_1,
@@ -209,6 +211,7 @@ public abstract class TestPulsarConnector {
             topicNames.add(TOPIC_4);
             topicNames.add(TOPIC_5);
             topicNames.add(TOPIC_6);
+            topicNames.add(NON_SCHEMA_TOPIC);
 
             partitionedTopicNames = new LinkedList<>();
             partitionedTopicNames.add(PARTITIONED_TOPIC_1);
@@ -274,6 +277,7 @@ public abstract class TestPulsarConnector {
             topicsToNumEntries.put(TOPIC_5.getSchemaName(), 8000L);
             topicsToNumEntries.put(TOPIC_6.getSchemaName(), 1L);
 
+            topicsToNumEntries.put(NON_SCHEMA_TOPIC.getSchemaName(), 8000L);
             topicsToNumEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L);
             topicsToNumEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L);
             topicsToNumEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L);
@@ -550,13 +554,15 @@ public abstract class TestPulsarConnector {
             allTopics.addAll(partitionedTopicNames);
 
             for (TopicName topicName : allTopics) {
-                splits.put(topicName, new PulsarSplit(0, pulsarConnectorId.toString(),
+                if (topicsToSchemas.containsKey(topicName.getSchemaName())) {
+                    splits.put(topicName, new PulsarSplit(0, pulsarConnectorId.toString(),
                         topicName.getNamespace(), topicName.getLocalName(),
                         topicsToNumEntries.get(topicName.getSchemaName()),
                         new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()),
                         topicsToSchemas.get(topicName.getSchemaName()).getType(),
                         0, topicsToNumEntries.get(topicName.getSchemaName()),
                         0, 0, TupleDomain.all(), new HashMap<>()));
+                }
             }
 
             fooFunctions = new HashMap<>();
@@ -742,7 +748,11 @@ public abstract class TestPulsarConnector {
             public SchemaInfo answer(InvocationOnMock invocationOnMock) throws Throwable {
                 Object[] args = invocationOnMock.getArguments();
                 String topic = (String) args[0];
-                return topicsToSchemas.get(topic);
+                if (topicsToSchemas.get(topic) != null) {
+                    return topicsToSchemas.get(topic);
+                } else {
+                    throw new PulsarAdminException(new ClientErrorException(Response.status(404).build()));
+                }
             }
         });
 
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index 2a1a58c..d39e403 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -45,6 +45,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
@@ -104,7 +105,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
     public void testGetTableMetadata(String delimiter) {
         updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         List<TopicName> allTopics = new LinkedList<>();
-        allTopics.addAll(topicNames);
+        allTopics.addAll(topicNames.stream().filter(topicName -> !topicName.equals(NON_SCHEMA_TOPIC)).collect(Collectors.toList()));
         allTopics.addAll(partitionedTopicNames);
 
         for (TopicName topic : allTopics) {
@@ -120,7 +121,6 @@ public class TestPulsarMetadata extends TestPulsarConnector {
 
             assertEquals(tableMetadata.getTable().getSchemaName(), topic.getNamespace());
             assertEquals(tableMetadata.getTable().getTableName(), topic.getLocalName());
-
             assertEquals(tableMetadata.getColumns().size(),
                     fooColumnHandles.size());
 
@@ -200,7 +200,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
 
         ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
                 pulsarTableHandle);
-        assertEquals(tableMetadata.getColumns().size(), 0);
+        assertEquals(tableMetadata.getColumns().size(), PulsarInternalColumn.getInternalFields().size() + 1);
     }
 
     @Test(dataProvider = "rewriteNamespaceDelimiter")
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
index e442522..ef36148 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java
@@ -36,6 +36,7 @@ import org.testng.annotations.Test;
 
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -51,6 +52,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertNotNull;
 
 @Test(singleThreaded = true)
 public class TestPulsarSplitManager extends TestPulsarConnector {
@@ -73,7 +75,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
     @Test(dataProvider = "rewriteNamespaceDelimiter")
     public void testTopic(String delimiter) throws Exception {
         updateRewriteNamespaceDelimiterIfNeeded(delimiter);
-        for (TopicName topicName : topicNames) {
+        List<TopicName> topics = new LinkedList<>();
+        topics.addAll(topicNames.stream().filter(topicName -> !topicName.equals(NON_SCHEMA_TOPIC)).collect(Collectors.toList()));
+        for (TopicName topicName : topics) {
             setup();
             log.info("!----- topic: %s -----!", topicName);
             PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(),
@@ -378,5 +382,24 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
 
     }
 
+    @Test(dataProvider = "rewriteNamespaceDelimiter")
+    public void testGetSplitNonSchema(String delimiter) throws Exception {
+        updateRewriteNamespaceDelimiterIfNeeded(delimiter);
+        TopicName topicName = NON_SCHEMA_TOPIC;
+        setup();
+        log.info("!----- topic: %s -----!", topicName);
+        PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(),
+            topicName.getNamespace(),
+            topicName.getLocalName(),
+            topicName.getLocalName());
 
+        Map<ColumnHandle, Domain> domainMap = new HashMap<>();
+        TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);
+
+        PulsarTableLayoutHandle pulsarTableLayoutHandle = new PulsarTableLayoutHandle(pulsarTableHandle, tupleDomain);
+        ConnectorSplitSource connectorSplitSource = this.pulsarSplitManager.getSplits(
+            mock(ConnectorTransactionHandle.class), mock(ConnectorSession.class),
+            pulsarTableLayoutHandle, null);
+        assertNotNull(connectorSplitSource);
+    }
 }