You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/24 01:53:12 UTC

[pulsar] branch master updated: Pulsar SQL supports pulsar's primitive schema (#4728)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ab35b0  Pulsar SQL supports pulsar's primitive schema (#4728)
1ab35b0 is described below

commit 1ab35b01bcf931eb3b65a5660eca62a9ac4ceb1d
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Jul 24 09:53:07 2019 +0800

    Pulsar SQL supports pulsar's primitive schema (#4728)
    
    ### Motivation
    Continue the PR of #4151
---
 .../apache/pulsar/common/schema/SchemaType.java    |  42 ++++++
 .../apache/pulsar/sql/presto/PulsarMetadata.java   |  97 +++++++++++-
 .../sql/presto/PulsarPrimitiveSchemaHandler.java   |  65 +++++++++
 .../pulsar/sql/presto/PulsarRecordCursor.java      |  23 +--
 .../pulsar/sql/presto/PulsarSchemaHandlers.java    |  53 +++++++
 .../org/apache/pulsar/sql/presto/PulsarSplit.java  |  17 ++-
 .../pulsar/sql/presto/PulsarSplitManager.java      |   3 +-
 .../pulsar/sql/presto/TestPulsarConnector.java     |   2 +-
 .../pulsar/sql/presto/TestPulsarMetadata.java      |   4 +
 .../presto/TestPulsarPrimitiveSchemaHandler.java   | 162 +++++++++++++++++++++
 10 files changed, 442 insertions(+), 26 deletions(-)

diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index 138b3ec..67cb090 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -175,4 +175,46 @@ public enum SchemaType {
           default: return NONE;
         }
       }
+
+
+    public boolean isPrimitive() {
+        return isPrimitiveType(this);
+    }
+
+    public boolean isStruct() {
+        return isStructType(this);
+    }
+
+    public static boolean isPrimitiveType(SchemaType type) {
+        switch (type) {
+            case STRING:
+            case BOOLEAN:
+            case INT8:
+            case INT16:
+            case INT32:
+            case INT64:
+            case FLOAT:
+            case DOUBLE:
+            case DATE:
+            case TIME:
+            case TIMESTAMP:
+            case BYTES:
+            case NONE:
+                return true;
+            default:
+                return false;
+        }
+
+    }
+
+    public static boolean isStructType(SchemaType type) {
+        switch (type) {
+            case AVRO:
+            case JSON:
+            case PROTOBUF:
+                return true;
+            default:
+                return false;
+        }
+    }
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index 3647c1b..1ee0177 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -34,9 +34,14 @@ import com.facebook.presto.spi.TableNotFoundException;
 import com.facebook.presto.spi.connector.ConnectorMetadata;
 import com.facebook.presto.spi.type.BigintType;
 import com.facebook.presto.spi.type.BooleanType;
+import com.facebook.presto.spi.type.DateType;
 import com.facebook.presto.spi.type.DoubleType;
 import com.facebook.presto.spi.type.IntegerType;
 import com.facebook.presto.spi.type.RealType;
+import com.facebook.presto.spi.type.SmallintType;
+import com.facebook.presto.spi.type.TimeType;
+import com.facebook.presto.spi.type.TimestampType;
+import com.facebook.presto.spi.type.TinyintType;
 import com.facebook.presto.spi.type.Type;
 import com.facebook.presto.spi.type.VarbinaryType;
 import com.facebook.presto.spi.type.VarcharType;
@@ -55,6 +60,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 
 import javax.inject.Inject;
 import java.util.HashMap;
@@ -296,6 +302,56 @@ public class PulsarMetadata implements ConnectorMetadata {
                     + String.format("%s/%s", namespace, schemaTableName.getTableName())
                     + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
         }
+        List<ColumnMetadata> handles = getPulsarColumns(
+                topicName, schemaInfo, withInternalColumns
+        );
+
+
+        return new ConnectorTableMetadata(schemaTableName, handles);
+    }
+
+    /**
+     * Convert pulsar schema into presto table metadata.
+     */
+    static List<ColumnMetadata> getPulsarColumns(TopicName topicName,
+                                                 SchemaInfo schemaInfo,
+                                                 boolean withInternalColumns) {
+        SchemaType schemaType = schemaInfo.getType();
+        if (schemaType.isStruct()) {
+            return getPulsarColumnsFromStructSchema(topicName, schemaInfo, withInternalColumns);
+        } else if (schemaType.isPrimitive()) {
+            return getPulsarColumnsFromPrimitiveSchema(topicName, schemaInfo, withInternalColumns);
+        } else {
+            throw new IllegalArgumentException("Unsupported schema : " + schemaInfo);
+        }
+    }
+
+    static List<ColumnMetadata> getPulsarColumnsFromPrimitiveSchema(TopicName topicName,
+                                                                    SchemaInfo schemaInfo,
+                                                                    boolean withInternalColumns) {
+        ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder();
+
+        ColumnMetadata valueColumn = new PulsarColumnMetadata(
+                "__value__",
+                convertPulsarType(schemaInfo.getType()),
+                null, null, false, false,
+                new String[0],
+                new Integer[0]);
+
+        builder.add(valueColumn);
+
+        if (withInternalColumns) {
+            PulsarInternalColumn.getInternalFields()
+                    .stream()
+                    .forEach(pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false)));
+        }
+
+        return builder.build();
+    }
+
+    static List<ColumnMetadata> getPulsarColumnsFromStructSchema(TopicName topicName,
+                                                                 SchemaInfo schemaInfo,
+                                                                 boolean withInternalColumns) {
 
         String schemaJson = new String(schemaInfo.getSchema());
         if (StringUtils.isBlank(schemaJson)) {
@@ -315,11 +371,44 @@ public class PulsarMetadata implements ConnectorMetadata {
         builder.addAll(getColumns(null, schema, new HashSet<>(), new Stack<>(), new Stack<>()));
 
         if (withInternalColumns) {
-            PulsarInternalColumn.getInternalFields().forEach(
-                pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false)));
+            PulsarInternalColumn.getInternalFields()
+                    .stream()
+                    .forEach(pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false)));
+        }
+         return builder.build();
+    }
+    @VisibleForTesting
+    static Type convertPulsarType(SchemaType pulsarType) {
+        switch (pulsarType) {
+            case BOOLEAN:
+                return BooleanType.BOOLEAN;
+            case INT8:
+                return TinyintType.TINYINT;
+            case INT16:
+                return SmallintType.SMALLINT;
+            case INT32:
+                return IntegerType.INTEGER;
+            case INT64:
+                return BigintType.BIGINT;
+            case FLOAT:
+                return RealType.REAL;
+            case DOUBLE:
+                return DoubleType.DOUBLE;
+            case NONE:
+            case BYTES:
+                return VarbinaryType.VARBINARY;
+            case STRING:
+                return VarcharType.VARCHAR;
+            case DATE:
+                return DateType.DATE;
+            case TIME:
+                return TimeType.TIME;
+            case TIMESTAMP:
+                return TimestampType.TIMESTAMP;
+            default:
+                log.error("Cannot convert type: %s", pulsarType);
+                return null;
         }
-
-        return new ConnectorTableMetadata(schemaTableName, builder.build());
     }
 
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java
new file mode 100644
index 0000000..6e3324b
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Date;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * A presto schema handler that interprets data using pulsar schema.
+ */
+public class PulsarPrimitiveSchemaHandler implements SchemaHandler {
+
+    private final SchemaInfo schemaInfo;
+    private final Schema<?> schema;
+
+    public PulsarPrimitiveSchemaHandler(SchemaInfo schemaInfo) {
+        this.schemaInfo = schemaInfo;
+        this.schema = AutoConsumeSchema.getSchema(schemaInfo);
+    }
+
+    @Override
+    public Object deserialize(ByteBuf byteBuf) {
+        byte[] data = ByteBufUtil.getBytes(byteBuf);
+        Object currentRecord = schema.decode(data);
+        switch (schemaInfo.getType()) {
+            case DATE:
+                return ((Date) currentRecord).getTime();
+            case TIME:
+                return ((Time) currentRecord).getTime();
+            case TIMESTAMP:
+                return ((Timestamp) currentRecord).getTime();
+            default:
+                return currentRecord;
+        }
+    }
+
+    @Override
+    public Object extractField(int index, Object currentRecord) {
+        return currentRecord;
+    }
+} 
\ No newline at end of file
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 09f3fa6..b2e8af5 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -140,9 +140,10 @@ public class PulsarRecordCursor implements RecordCursor {
         this.readOffloaded = pulsarConnectorConfig.getManagedLedgerOffloadDriver() != null;
         this.pulsarConnectorConfig = pulsarConnectorConfig;
 
-        Schema schema = PulsarConnectorUtils.parseSchema(pulsarSplit.getSchema());
-
-        this.schemaHandler = getSchemaHandler(schema, pulsarSplit.getSchemaType(), columnHandles);
+        this.schemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(
+                pulsarSplit.getSchemaInfo(),
+                columnHandles
+        );
 
         log.info("Initializing split with parameters: %s", pulsarSplit);
 
@@ -156,22 +157,6 @@ public class PulsarRecordCursor implements RecordCursor {
         }
     }
 
-    private SchemaHandler getSchemaHandler(Schema schema, SchemaType schemaType,
-                                           List<PulsarColumnHandle> columnHandles) {
-        SchemaHandler schemaHandler;
-        switch (schemaType) {
-            case JSON:
-                schemaHandler = new JSONSchemaHandler(columnHandles);
-                break;
-            case AVRO:
-                schemaHandler = new AvroSchemaHandler(schema, columnHandles);
-                break;
-            default:
-                throw new PrestoException(NOT_SUPPORTED, "Not supported schema type: " + schemaType);
-        }
-        return schemaHandler;
-    }
-
     private ReadOnlyCursor getCursor(TopicName topicName, Position startPosition, ManagedLedgerFactory
             managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig)
             throws ManagedLedgerException, InterruptedException {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
new file mode 100644
index 0000000..6d304df
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.facebook.presto.spi.PrestoException;
+import java.util.List;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+class PulsarSchemaHandlers {
+
+    static SchemaHandler newPulsarSchemaHandler(SchemaInfo schemaInfo,
+                                                List<PulsarColumnHandle> columnHandles) {
+        if (schemaInfo.getType().isPrimitive()) {
+            return new PulsarPrimitiveSchemaHandler(schemaInfo);
+        } else if (schemaInfo.getType().isStruct()) {
+            switch (schemaInfo.getType()) {
+                case JSON:
+                    return new JSONSchemaHandler(columnHandles);
+                case AVRO:
+                    return new AvroSchemaHandler(PulsarConnectorUtils
+                            .parseSchema(new String(schemaInfo.getSchema(), UTF_8)
+                    ), columnHandles);
+                default:
+                    throw new PrestoException(NOT_SUPPORTED, "Not supported schema type: " + schemaInfo.getType());
+            }
+
+        } else {
+            throw new PrestoException(
+                    NOT_SUPPORTED,
+                    "Schema `" + schemaInfo.getType() + "` is not supported by presto yet : " + schemaInfo);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
index e5688ae..2fdccc4 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
@@ -26,9 +26,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
 import java.util.List;
+import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
 
@@ -46,6 +48,7 @@ public class PulsarSplit implements ConnectorSplit {
     private final long startPositionLedgerId;
     private final long endPositionLedgerId;
     private final TupleDomain<ColumnHandle> tupleDomain;
+    private final SchemaInfo schemaInfo;
 
     private final PositionImpl startPosition;
     private final PositionImpl endPosition;
@@ -63,8 +66,16 @@ public class PulsarSplit implements ConnectorSplit {
             @JsonProperty("endPositionEntryId") long endPositionEntryId,
             @JsonProperty("startPositionLedgerId") long startPositionLedgerId,
             @JsonProperty("endPositionLedgerId") long endPositionLedgerId,
-            @JsonProperty("tupleDomain") TupleDomain<ColumnHandle> tupleDomain) {
+            @JsonProperty("tupleDomain") TupleDomain<ColumnHandle> tupleDomain,
+            @JsonProperty("properties") Map<String, String> schemaInfoProperties) {
         this.splitId = splitId;
+        requireNonNull(schemaName, "schema name is null");
+        this.schemaInfo = SchemaInfo.builder()
+                .type(schemaType)
+                .name(schemaName)
+                .schema(schema.getBytes())
+                .properties(schemaInfoProperties)
+                .build();
         this.schemaName = requireNonNull(schemaName, "schema name is null");
         this.connectorId = requireNonNull(connectorId, "connector id is null");
         this.tableName = requireNonNull(tableName, "table name is null");
@@ -179,4 +190,8 @@ public class PulsarSplit implements ConnectorSplit {
                 ", endPositionLedgerId=" + endPositionLedgerId +
                 '}';
     }
+
+    public SchemaInfo getSchemaInfo() {
+        return schemaInfo;
+    }
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 53195a9..15192e4 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -270,7 +270,8 @@ public class PulsarSplitManager implements ConnectorSplitManager {
                         endPosition.getEntryId(),
                         startPosition.getLedgerId(),
                         endPosition.getLedgerId(),
-                        tupleDomain));
+                        tupleDomain,
+                        schemaInfo.getProperties()));
             }
             return splits;
         } finally {
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 6a5cd4f..7721324 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -552,7 +552,7 @@ public abstract class TestPulsarConnector {
                         new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()),
                         topicsToSchemas.get(topicName.getSchemaName()).getType(),
                         0, topicsToNumEntries.get(topicName.getSchemaName()),
-                        0, 0, TupleDomain.all()));
+                        0, 0, TupleDomain.all(), new HashMap<>()));
             }
 
             fooFunctions = new HashMap<>();
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index 40bf57c..829aac9 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -35,6 +35,8 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import javax.ws.rs.ClientErrorException;
 import javax.ws.rs.core.Response;
+
+import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.annotations.Test;
 
 import java.util.Arrays;
@@ -206,6 +208,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         SchemaInfo badSchemaInfo = new SchemaInfo();
         badSchemaInfo.setSchema(new byte[0]);
+        badSchemaInfo.setType(SchemaType.AVRO);
         when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
 
         PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
@@ -231,6 +234,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
         updateRewriteNamespaceDelimiterIfNeeded(delimiter);
         SchemaInfo badSchemaInfo = new SchemaInfo();
         badSchemaInfo.setSchema("foo".getBytes());
+        badSchemaInfo.setType(SchemaType.AVRO);
         when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
 
         PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
new file mode 100644
index 0000000..87e148d
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import com.facebook.presto.spi.ColumnMetadata;
+import io.netty.buffer.ByteBufAllocator;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.impl.schema.BooleanSchema;
+import org.apache.pulsar.client.impl.schema.ByteSchema;
+import org.apache.pulsar.client.impl.schema.BytesSchema;
+import org.apache.pulsar.client.impl.schema.DateSchema;
+import org.apache.pulsar.client.impl.schema.DoubleSchema;
+import org.apache.pulsar.client.impl.schema.FloatSchema;
+import org.apache.pulsar.client.impl.schema.IntSchema;
+import org.apache.pulsar.client.impl.schema.LongSchema;
+import org.apache.pulsar.client.impl.schema.ShortSchema;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.client.impl.schema.TimeSchema;
+import org.apache.pulsar.client.impl.schema.TimestampSchema;
+import org.apache.pulsar.common.api.raw.RawMessage;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@Slf4j
+public class TestPulsarPrimitiveSchemaHandler {
+
+    private static final TopicName stringTopicName = TopicName.get("persistent", "tenant-1", "ns-1", "topic-1");
+    @Test
+    public void testPulsarPrimitiveSchemaHandler() {
+        PulsarPrimitiveSchemaHandler pulsarPrimitiveSchemaHandler;
+        RawMessage rawMessage = mock(RawMessage.class);
+        SchemaInfo schemaInfoInt8 = SchemaInfo.builder().type(SchemaType.INT8).build();
+        pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoInt8);
+        byte int8Value = 1;
+        when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(ByteSchema.of().encode(int8Value)));
+        Assert.assertEquals(int8Value, (byte)pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+        SchemaInfo schemaInfoInt16 = SchemaInfo.builder().type(SchemaType.INT16).build();
+        pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoInt16);
+        short int16Value = 2;
+        when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(ShortSchema.of().encode(int16Value)));
+        Assert.assertEquals(int16Value, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+        SchemaInfo schemaInfoInt32 = SchemaInfo.builder().type(SchemaType.INT32).build();
+        pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoInt32);
+        int int32Value = 2;
+        when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(IntSchema.of().encode(int32Value)));
+        Assert.assertEquals(int32Value, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+        SchemaInfo schemaInfoInt64 = SchemaInfo.builder().type(SchemaType.INT64).build();
+        pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoInt64);
+        long int64Value = 2;
+        when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(LongSchema.of().encode(int64Value)));
+        Assert.assertEquals(int64Value, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+        SchemaInfo schemaInfoString = SchemaInfo.builder().type(SchemaType.STRING).build();
+        pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoString);
+        String stringValue = "test";
+        when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(StringSchema.utf8().encode(stringValue)));
+        Assert.assertEquals(stringValue, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+        SchemaInfo schemaInfoFloat = SchemaInfo.builder().type(SchemaType.FLOAT).build();
+        pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoFloat);
+        float floatValue = 0.2f;
+        when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(FloatSchema.of().encode(floatValue)));
+        Assert.assertEquals(floatValue, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+        SchemaInfo schemaInfoDouble = SchemaInfo.builder().type(SchemaType.DOUBLE).build();
+        pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoDouble);
+        double doubleValue = 0.22d;
+        when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(DoubleSchema.of().encode(doubleValue)));
+        Assert.assertEquals(doubleValue, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+        SchemaInfo schemaInfoBoolean = SchemaInfo.builder().type(SchemaType.BOOLEAN).build();
+        pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoBoolean);
+        boolean booleanValue = true;
+        when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(BooleanSchema.of().encode(booleanValue)));
+        Assert.assertEquals(booleanValue, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+        SchemaInfo schemaInfoBytes = SchemaInfo.builder().type(SchemaType.BYTES).build();
+        pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoBytes);
+        byte[] bytesValue = new byte[1];
+        bytesValue[0] = 1;
+        when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(BytesSchema.of().encode(bytesValue)));
+        Assert.assertEquals(bytesValue, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+        SchemaInfo schemaInfoDate = SchemaInfo.builder().type(SchemaType.DATE).build();
+        pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoDate);
+        Date dateValue = new Date(System.currentTimeMillis());
+        when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(DateSchema.of().encode(dateValue)));
+        Object dateDeserializeValue = pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData());
+        Assert.assertEquals(dateValue.getTime(), dateDeserializeValue);
+
+        SchemaInfo schemaInfoTime = SchemaInfo.builder().type(SchemaType.TIME).build();
+        pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoTime);
+        Time timeValue = new Time(System.currentTimeMillis());
+        when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(TimeSchema.of().encode(timeValue)));
+        Object timeDeserializeValue = pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData());
+        Assert.assertEquals(timeValue.getTime(), timeDeserializeValue);
+
+        SchemaInfo schemaInfoTimestamp = SchemaInfo.builder().type(SchemaType.TIMESTAMP).build();
+        pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoTimestamp);
+        Timestamp timestampValue = new Timestamp(System.currentTimeMillis());
+        when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(TimestampSchema.of().encode(timestampValue)));
+        Object timestampDeserializeValue = pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData());
+        Assert.assertEquals(timestampValue.getTime(), timestampDeserializeValue);
+    }
+
+    @Test
+    public void testNewPulsarPrimitiveSchemaHandler() {
+        RawMessage rawMessage = mock(RawMessage.class);
+        SchemaHandler schemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(
+                StringSchema.utf8().getSchemaInfo(),
+                null);
+
+        String stringValue = "test";
+        when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(StringSchema.utf8().encode(stringValue)));
+
+        Object deserializeValue = schemaHandler.deserialize(rawMessage.getData());
+        Assert.assertEquals(stringValue, (String)deserializeValue);
+        Assert.assertEquals(stringValue, (String)deserializeValue);
+
+    }
+
+    @Test
+    public void testNewColumnMetadata() {
+        List<ColumnMetadata> columnMetadataList = PulsarMetadata.getPulsarColumns(stringTopicName,
+                StringSchema.utf8().getSchemaInfo(), false);
+        Assert.assertEquals(columnMetadataList.size(), 1);
+        ColumnMetadata columnMetadata = columnMetadataList.get(0);
+        Assert.assertEquals("__value__", columnMetadata.getName());
+        Assert.assertEquals("varchar", columnMetadata.getType().toString());
+    }
+}
\ No newline at end of file