You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/16 22:53:47 UTC

[GitHub] jerrypeng closed pull request #3195: Get information_schema for pulsar connector to work

jerrypeng closed pull request #3195: Get information_schema for pulsar connector to work
URL: https://github.com/apache/pulsar/pull/3195
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
index a1df7c7b68..34d332e54d 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java
@@ -18,15 +18,14 @@
  */
 package org.apache.pulsar.sql.presto;
 
-import com.google.gson.Gson;
-import com.google.gson.reflect.TypeToken;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import io.airlift.configuration.Config;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
 
 import javax.validation.constraints.NotNull;
-import java.lang.reflect.Type;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -125,9 +124,8 @@ public PulsarConnectorConfig setStatsProvider(String statsProvider) {
     }
 
     @Config("pulsar.stats-provider-configs")
-    public PulsarConnectorConfig setStatsProviderConfigs(String statsProviderConfigs) {
-        Type type = new TypeToken<Map<String, String>>(){}.getType();
-        this.statsProviderConfigs = new Gson().fromJson(statsProviderConfigs, type);
+    public PulsarConnectorConfig setStatsProviderConfigs(String statsProviderConfigs) throws IOException {
+        this.statsProviderConfigs = new ObjectMapper().readValue(statsProviderConfigs, Map.class);
         return this;
     }
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
index 388df0eefe..520ee68040 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java
@@ -34,13 +34,8 @@ public static Schema parseSchema(String schemaJson) {
         return parser.parse(schemaJson);
     }
 
-    public static boolean isPartitionedTopic(TopicName topicName, PulsarAdmin pulsarAdmin) {
-        try {
-            return pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString()).partitions > 0;
-        } catch (PulsarAdminException e) {
-            throw new RuntimeException("Failed to determine if topic " + topicName + " is partitioned: "
-                    + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
-        }
+    public static boolean isPartitionedTopic(TopicName topicName, PulsarAdmin pulsarAdmin) throws PulsarAdminException {
+        return pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString()).partitions > 0;
     }
 
     /**
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index f283ea24c2..ddc27a4e93 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -57,6 +57,7 @@
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 import javax.inject.Inject;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -80,6 +81,8 @@
     private final String connectorId;
     private final PulsarAdmin pulsarAdmin;
 
+    private static final String INFORMATION_SCHEMA = "information_schema";
+
     private static final Logger log = Logger.get(PulsarMetadata.class);
 
     @Inject
@@ -133,7 +136,14 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa
 
     @Override
     public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) {
-        return getTableMetadata(convertTableHandle(table).toSchemaTableName(), true);
+        ConnectorTableMetadata connectorTableMetadata;
+        SchemaTableName schemaTableName = convertTableHandle(table).toSchemaTableName();
+        connectorTableMetadata = getTableMetadata(schemaTableName, true);
+        if (connectorTableMetadata == null) {
+            ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder();
+            connectorTableMetadata = new ConnectorTableMetadata(schemaTableName, builder.build());
+        }
+        return connectorTableMetadata;
     }
 
     @Override
@@ -141,20 +151,25 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
         ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
 
         if (schemaNameOrNull != null) {
-            List<String> pulsarTopicList = null;
-            try {
-                pulsarTopicList = this.pulsarAdmin.topics().getList(schemaNameOrNull);
-            } catch (PulsarAdminException e) {
-                if (e.getStatusCode() == 404) {
-                    log.warn("Schema " + schemaNameOrNull + " does not exsit");
-                    return builder.build();
+
+            if (schemaNameOrNull.equals(INFORMATION_SCHEMA)) {
+                // no-op for now but add pulsar connector specific system tables here
+            } else {
+                List<String> pulsarTopicList = null;
+                try {
+                    pulsarTopicList = this.pulsarAdmin.topics().getList(schemaNameOrNull);
+                } catch (PulsarAdminException e) {
+                    if (e.getStatusCode() == 404) {
+                        log.warn("Schema " + schemaNameOrNull + " does not exsit");
+                        return builder.build();
+                    }
+                    throw new RuntimeException("Failed to get tables/topics in " + schemaNameOrNull + ": "
+                            + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
+                }
+                if (pulsarTopicList != null) {
+                    pulsarTopicList.forEach(topic -> builder.add(
+                            new SchemaTableName(schemaNameOrNull, TopicName.get(topic).getLocalName())));
                 }
-                throw new RuntimeException("Failed to get tables/topics in " + schemaNameOrNull + ": "
-                        + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
-            }
-            if (pulsarTopicList != null) {
-                pulsarTopicList.forEach(topic -> builder.add(
-                        new SchemaTableName(schemaNameOrNull, TopicName.get(topic).getLocalName())));
             }
         }
         return builder.build();
@@ -165,6 +180,9 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
         PulsarTableHandle pulsarTableHandle = convertTableHandle(tableHandle);
 
         ConnectorTableMetadata tableMetaData = getTableMetadata(pulsarTableHandle.toSchemaTableName(), false);
+        if (tableMetaData == null) {
+            return new HashMap<>();
+        }
 
         ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
 
@@ -224,7 +242,10 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
         }
 
         for (SchemaTableName tableName : tableNames) {
-            columns.put(tableName, getTableMetadata(tableName, true).getColumns());
+            ConnectorTableMetadata connectorTableMetadata = getTableMetadata(tableName, true);
+            if (connectorTableMetadata != null) {
+                columns.put(tableName, connectorTableMetadata.getColumns());
+            }
         }
 
         return columns.build();
@@ -232,9 +253,11 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
 
     private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName, boolean withInternalColumns) {
 
-        TopicName topicName;
+        if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) {
+            return null;
+        }
 
-        topicName = TopicName.get(
+        TopicName topicName = TopicName.get(
                 String.format("%s/%s", schemaTableName.getSchemaName(), schemaTableName.getTableName()));
 
         List<String> topics;
@@ -265,7 +288,8 @@ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName,
                     String.format("%s/%s", schemaTableName.getSchemaName(), schemaTableName.getTableName()));
         } catch (PulsarAdminException e) {
             if (e.getStatusCode() == 404) {
-                throw new PrestoException(NOT_SUPPORTED, "Topic " + topicName.toString() + " does not have a schema");
+                // to indicate that we can't read from topic because there is no schema
+                return null;
             }
             throw new RuntimeException("Failed to get schema information for topic "
                     + String.format("%s/%s", schemaTableName.getSchemaName(), schemaTableName.getTableName())
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index 17ce4167ec..0803cd67ea 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -180,14 +180,10 @@ public void testGetTableMetadataTableNoSchema() throws PulsarAdminException {
                 TOPIC_1.getLocalName()
         );
 
-        try {
-            ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
-                    pulsarTableHandle);
-            Assert.fail("Table without schema should have generated an exception");
-        } catch (PrestoException e) {
-            Assert.assertEquals(e.getErrorCode(), NOT_SUPPORTED.toErrorCode());
-            Assert.assertEquals(e.getMessage(), "Topic persistent://tenant-1/ns-1/topic-1 does not have a schema");
-        }
+
+        ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
+                pulsarTableHandle);
+        Assert.assertEquals(tableMetadata.getColumns().size(), 0);
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services