You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/07/24 01:53:12 UTC
[pulsar] branch master updated: Pulsar SQL supports pulsar's
primitive schema (#4728)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1ab35b0 Pulsar SQL supports pulsar's primitive schema (#4728)
1ab35b0 is described below
commit 1ab35b01bcf931eb3b65a5660eca62a9ac4ceb1d
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Wed Jul 24 09:53:07 2019 +0800
Pulsar SQL supports pulsar's primitive schema (#4728)
### Motivation
Continue the PR of #4151
---
.../apache/pulsar/common/schema/SchemaType.java | 42 ++++++
.../apache/pulsar/sql/presto/PulsarMetadata.java | 97 +++++++++++-
.../sql/presto/PulsarPrimitiveSchemaHandler.java | 65 +++++++++
.../pulsar/sql/presto/PulsarRecordCursor.java | 23 +--
.../pulsar/sql/presto/PulsarSchemaHandlers.java | 53 +++++++
.../org/apache/pulsar/sql/presto/PulsarSplit.java | 17 ++-
.../pulsar/sql/presto/PulsarSplitManager.java | 3 +-
.../pulsar/sql/presto/TestPulsarConnector.java | 2 +-
.../pulsar/sql/presto/TestPulsarMetadata.java | 4 +
.../presto/TestPulsarPrimitiveSchemaHandler.java | 162 +++++++++++++++++++++
10 files changed, 442 insertions(+), 26 deletions(-)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index 138b3ec..67cb090 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -175,4 +175,46 @@ public enum SchemaType {
default: return NONE;
}
}
+
+
+ public boolean isPrimitive() {
+ return isPrimitiveType(this);
+ }
+
+ public boolean isStruct() {
+ return isStructType(this);
+ }
+
+ public static boolean isPrimitiveType(SchemaType type) {
+ switch (type) {
+ case STRING:
+ case BOOLEAN:
+ case INT8:
+ case INT16:
+ case INT32:
+ case INT64:
+ case FLOAT:
+ case DOUBLE:
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ case BYTES:
+ case NONE:
+ return true;
+ default:
+ return false;
+ }
+
+ }
+
+ public static boolean isStructType(SchemaType type) {
+ switch (type) {
+ case AVRO:
+ case JSON:
+ case PROTOBUF:
+ return true;
+ default:
+ return false;
+ }
+ }
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index 3647c1b..1ee0177 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -34,9 +34,14 @@ import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.BooleanType;
+import com.facebook.presto.spi.type.DateType;
import com.facebook.presto.spi.type.DoubleType;
import com.facebook.presto.spi.type.IntegerType;
import com.facebook.presto.spi.type.RealType;
+import com.facebook.presto.spi.type.SmallintType;
+import com.facebook.presto.spi.type.TimeType;
+import com.facebook.presto.spi.type.TimestampType;
+import com.facebook.presto.spi.type.TinyintType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarbinaryType;
import com.facebook.presto.spi.type.VarcharType;
@@ -55,6 +60,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
import javax.inject.Inject;
import java.util.HashMap;
@@ -296,6 +302,56 @@ public class PulsarMetadata implements ConnectorMetadata {
+ String.format("%s/%s", namespace, schemaTableName.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
+ List<ColumnMetadata> handles = getPulsarColumns(
+ topicName, schemaInfo, withInternalColumns
+ );
+
+
+ return new ConnectorTableMetadata(schemaTableName, handles);
+ }
+
+ /**
+ * Convert pulsar schema into presto table metadata.
+ */
+ static List<ColumnMetadata> getPulsarColumns(TopicName topicName,
+ SchemaInfo schemaInfo,
+ boolean withInternalColumns) {
+ SchemaType schemaType = schemaInfo.getType();
+ if (schemaType.isStruct()) {
+ return getPulsarColumnsFromStructSchema(topicName, schemaInfo, withInternalColumns);
+ } else if (schemaType.isPrimitive()) {
+ return getPulsarColumnsFromPrimitiveSchema(topicName, schemaInfo, withInternalColumns);
+ } else {
+ throw new IllegalArgumentException("Unsupported schema : " + schemaInfo);
+ }
+ }
+
+ static List<ColumnMetadata> getPulsarColumnsFromPrimitiveSchema(TopicName topicName,
+ SchemaInfo schemaInfo,
+ boolean withInternalColumns) {
+ ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder();
+
+ ColumnMetadata valueColumn = new PulsarColumnMetadata(
+ "__value__",
+ convertPulsarType(schemaInfo.getType()),
+ null, null, false, false,
+ new String[0],
+ new Integer[0]);
+
+ builder.add(valueColumn);
+
+ if (withInternalColumns) {
+ PulsarInternalColumn.getInternalFields()
+ .stream()
+ .forEach(pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false)));
+ }
+
+ return builder.build();
+ }
+
+ static List<ColumnMetadata> getPulsarColumnsFromStructSchema(TopicName topicName,
+ SchemaInfo schemaInfo,
+ boolean withInternalColumns) {
String schemaJson = new String(schemaInfo.getSchema());
if (StringUtils.isBlank(schemaJson)) {
@@ -315,11 +371,44 @@ public class PulsarMetadata implements ConnectorMetadata {
builder.addAll(getColumns(null, schema, new HashSet<>(), new Stack<>(), new Stack<>()));
if (withInternalColumns) {
- PulsarInternalColumn.getInternalFields().forEach(
- pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false)));
+ PulsarInternalColumn.getInternalFields()
+ .stream()
+ .forEach(pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false)));
+ }
+ return builder.build();
+ }
+ @VisibleForTesting
+ static Type convertPulsarType(SchemaType pulsarType) {
+ switch (pulsarType) {
+ case BOOLEAN:
+ return BooleanType.BOOLEAN;
+ case INT8:
+ return TinyintType.TINYINT;
+ case INT16:
+ return SmallintType.SMALLINT;
+ case INT32:
+ return IntegerType.INTEGER;
+ case INT64:
+ return BigintType.BIGINT;
+ case FLOAT:
+ return RealType.REAL;
+ case DOUBLE:
+ return DoubleType.DOUBLE;
+ case NONE:
+ case BYTES:
+ return VarbinaryType.VARBINARY;
+ case STRING:
+ return VarcharType.VARCHAR;
+ case DATE:
+ return DateType.DATE;
+ case TIME:
+ return TimeType.TIME;
+ case TIMESTAMP:
+ return TimestampType.TIMESTAMP;
+ default:
+ log.error("Cannot convert type: %s", pulsarType);
+ return null;
}
-
- return new ConnectorTableMetadata(schemaTableName, builder.build());
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java
new file mode 100644
index 0000000..6e3324b
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Date;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+/**
+ * A presto schema handler that interprets data using pulsar schema.
+ */
+public class PulsarPrimitiveSchemaHandler implements SchemaHandler {
+
+ private final SchemaInfo schemaInfo;
+ private final Schema<?> schema;
+
+ public PulsarPrimitiveSchemaHandler(SchemaInfo schemaInfo) {
+ this.schemaInfo = schemaInfo;
+ this.schema = AutoConsumeSchema.getSchema(schemaInfo);
+ }
+
+ @Override
+ public Object deserialize(ByteBuf byteBuf) {
+ byte[] data = ByteBufUtil.getBytes(byteBuf);
+ Object currentRecord = schema.decode(data);
+ switch (schemaInfo.getType()) {
+ case DATE:
+ return ((Date) currentRecord).getTime();
+ case TIME:
+ return ((Time) currentRecord).getTime();
+ case TIMESTAMP:
+ return ((Timestamp) currentRecord).getTime();
+ default:
+ return currentRecord;
+ }
+ }
+
+ @Override
+ public Object extractField(int index, Object currentRecord) {
+ return currentRecord;
+ }
+}
\ No newline at end of file
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
index 09f3fa6..b2e8af5 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java
@@ -140,9 +140,10 @@ public class PulsarRecordCursor implements RecordCursor {
this.readOffloaded = pulsarConnectorConfig.getManagedLedgerOffloadDriver() != null;
this.pulsarConnectorConfig = pulsarConnectorConfig;
- Schema schema = PulsarConnectorUtils.parseSchema(pulsarSplit.getSchema());
-
- this.schemaHandler = getSchemaHandler(schema, pulsarSplit.getSchemaType(), columnHandles);
+ this.schemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(
+ pulsarSplit.getSchemaInfo(),
+ columnHandles
+ );
log.info("Initializing split with parameters: %s", pulsarSplit);
@@ -156,22 +157,6 @@ public class PulsarRecordCursor implements RecordCursor {
}
}
- private SchemaHandler getSchemaHandler(Schema schema, SchemaType schemaType,
- List<PulsarColumnHandle> columnHandles) {
- SchemaHandler schemaHandler;
- switch (schemaType) {
- case JSON:
- schemaHandler = new JSONSchemaHandler(columnHandles);
- break;
- case AVRO:
- schemaHandler = new AvroSchemaHandler(schema, columnHandles);
- break;
- default:
- throw new PrestoException(NOT_SUPPORTED, "Not supported schema type: " + schemaType);
- }
- return schemaHandler;
- }
-
private ReadOnlyCursor getCursor(TopicName topicName, Position startPosition, ManagedLedgerFactory
managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig)
throws ManagedLedgerException, InterruptedException {
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
new file mode 100644
index 0000000..6d304df
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSchemaHandlers.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.facebook.presto.spi.PrestoException;
+import java.util.List;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+class PulsarSchemaHandlers {
+
+ static SchemaHandler newPulsarSchemaHandler(SchemaInfo schemaInfo,
+ List<PulsarColumnHandle> columnHandles) {
+ if (schemaInfo.getType().isPrimitive()) {
+ return new PulsarPrimitiveSchemaHandler(schemaInfo);
+ } else if (schemaInfo.getType().isStruct()) {
+ switch (schemaInfo.getType()) {
+ case JSON:
+ return new JSONSchemaHandler(columnHandles);
+ case AVRO:
+ return new AvroSchemaHandler(PulsarConnectorUtils
+ .parseSchema(new String(schemaInfo.getSchema(), UTF_8)
+ ), columnHandles);
+ default:
+ throw new PrestoException(NOT_SUPPORTED, "Not supported schema type: " + schemaInfo.getType());
+ }
+
+ } else {
+ throw new PrestoException(
+ NOT_SUPPORTED,
+ "Schema `" + schemaInfo.getType() + "` is not supported by presto yet : " + schemaInfo);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
index e5688ae..2fdccc4 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
@@ -26,9 +26,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.util.List;
+import java.util.Map;
import static java.util.Objects.requireNonNull;
@@ -46,6 +48,7 @@ public class PulsarSplit implements ConnectorSplit {
private final long startPositionLedgerId;
private final long endPositionLedgerId;
private final TupleDomain<ColumnHandle> tupleDomain;
+ private final SchemaInfo schemaInfo;
private final PositionImpl startPosition;
private final PositionImpl endPosition;
@@ -63,8 +66,16 @@ public class PulsarSplit implements ConnectorSplit {
@JsonProperty("endPositionEntryId") long endPositionEntryId,
@JsonProperty("startPositionLedgerId") long startPositionLedgerId,
@JsonProperty("endPositionLedgerId") long endPositionLedgerId,
- @JsonProperty("tupleDomain") TupleDomain<ColumnHandle> tupleDomain) {
+ @JsonProperty("tupleDomain") TupleDomain<ColumnHandle> tupleDomain,
+ @JsonProperty("properties") Map<String, String> schemaInfoProperties) {
this.splitId = splitId;
+ requireNonNull(schemaName, "schema name is null");
+ this.schemaInfo = SchemaInfo.builder()
+ .type(schemaType)
+ .name(schemaName)
+ .schema(schema.getBytes())
+ .properties(schemaInfoProperties)
+ .build();
this.schemaName = requireNonNull(schemaName, "schema name is null");
this.connectorId = requireNonNull(connectorId, "connector id is null");
this.tableName = requireNonNull(tableName, "table name is null");
@@ -179,4 +190,8 @@ public class PulsarSplit implements ConnectorSplit {
", endPositionLedgerId=" + endPositionLedgerId +
'}';
}
+
+ public SchemaInfo getSchemaInfo() {
+ return schemaInfo;
+ }
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 53195a9..15192e4 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -270,7 +270,8 @@ public class PulsarSplitManager implements ConnectorSplitManager {
endPosition.getEntryId(),
startPosition.getLedgerId(),
endPosition.getLedgerId(),
- tupleDomain));
+ tupleDomain,
+ schemaInfo.getProperties()));
}
return splits;
} finally {
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index 6a5cd4f..7721324 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -552,7 +552,7 @@ public abstract class TestPulsarConnector {
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()),
topicsToSchemas.get(topicName.getSchemaName()).getType(),
0, topicsToNumEntries.get(topicName.getSchemaName()),
- 0, 0, TupleDomain.all()));
+ 0, 0, TupleDomain.all(), new HashMap<>()));
}
fooFunctions = new HashMap<>();
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index 40bf57c..829aac9 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -35,6 +35,8 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
import javax.ws.rs.ClientErrorException;
import javax.ws.rs.core.Response;
+
+import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.Test;
import java.util.Arrays;
@@ -206,6 +208,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
SchemaInfo badSchemaInfo = new SchemaInfo();
badSchemaInfo.setSchema(new byte[0]);
+ badSchemaInfo.setType(SchemaType.AVRO);
when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
@@ -231,6 +234,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
SchemaInfo badSchemaInfo = new SchemaInfo();
badSchemaInfo.setSchema("foo".getBytes());
+ badSchemaInfo.setType(SchemaType.AVRO);
when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
new file mode 100644
index 0000000..87e148d
--- /dev/null
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarPrimitiveSchemaHandler.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.sql.presto;
+
+import com.facebook.presto.spi.ColumnMetadata;
+import io.netty.buffer.ByteBufAllocator;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.impl.schema.BooleanSchema;
+import org.apache.pulsar.client.impl.schema.ByteSchema;
+import org.apache.pulsar.client.impl.schema.BytesSchema;
+import org.apache.pulsar.client.impl.schema.DateSchema;
+import org.apache.pulsar.client.impl.schema.DoubleSchema;
+import org.apache.pulsar.client.impl.schema.FloatSchema;
+import org.apache.pulsar.client.impl.schema.IntSchema;
+import org.apache.pulsar.client.impl.schema.LongSchema;
+import org.apache.pulsar.client.impl.schema.ShortSchema;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.client.impl.schema.TimeSchema;
+import org.apache.pulsar.client.impl.schema.TimestampSchema;
+import org.apache.pulsar.common.api.raw.RawMessage;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.List;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@Slf4j
+public class TestPulsarPrimitiveSchemaHandler {
+
+ private static final TopicName stringTopicName = TopicName.get("persistent", "tenant-1", "ns-1", "topic-1");
+ @Test
+ public void testPulsarPrimitiveSchemaHandler() {
+ PulsarPrimitiveSchemaHandler pulsarPrimitiveSchemaHandler;
+ RawMessage rawMessage = mock(RawMessage.class);
+ SchemaInfo schemaInfoInt8 = SchemaInfo.builder().type(SchemaType.INT8).build();
+ pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoInt8);
+ byte int8Value = 1;
+ when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(ByteSchema.of().encode(int8Value)));
+ Assert.assertEquals(int8Value, (byte)pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+ SchemaInfo schemaInfoInt16 = SchemaInfo.builder().type(SchemaType.INT16).build();
+ pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoInt16);
+ short int16Value = 2;
+ when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(ShortSchema.of().encode(int16Value)));
+ Assert.assertEquals(int16Value, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+ SchemaInfo schemaInfoInt32 = SchemaInfo.builder().type(SchemaType.INT32).build();
+ pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoInt32);
+ int int32Value = 2;
+ when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(IntSchema.of().encode(int32Value)));
+ Assert.assertEquals(int32Value, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+ SchemaInfo schemaInfoInt64 = SchemaInfo.builder().type(SchemaType.INT64).build();
+ pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoInt64);
+ long int64Value = 2;
+ when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(LongSchema.of().encode(int64Value)));
+ Assert.assertEquals(int64Value, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+ SchemaInfo schemaInfoString = SchemaInfo.builder().type(SchemaType.STRING).build();
+ pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoString);
+ String stringValue = "test";
+ when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(StringSchema.utf8().encode(stringValue)));
+ Assert.assertEquals(stringValue, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+ SchemaInfo schemaInfoFloat = SchemaInfo.builder().type(SchemaType.FLOAT).build();
+ pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoFloat);
+ float floatValue = 0.2f;
+ when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(FloatSchema.of().encode(floatValue)));
+ Assert.assertEquals(floatValue, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+ SchemaInfo schemaInfoDouble = SchemaInfo.builder().type(SchemaType.DOUBLE).build();
+ pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoDouble);
+ double doubleValue = 0.22d;
+ when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(DoubleSchema.of().encode(doubleValue)));
+ Assert.assertEquals(doubleValue, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+ SchemaInfo schemaInfoBoolean = SchemaInfo.builder().type(SchemaType.BOOLEAN).build();
+ pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoBoolean);
+ boolean booleanValue = true;
+ when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(BooleanSchema.of().encode(booleanValue)));
+ Assert.assertEquals(booleanValue, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+ SchemaInfo schemaInfoBytes = SchemaInfo.builder().type(SchemaType.BYTES).build();
+ pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoBytes);
+ byte[] bytesValue = new byte[1];
+ bytesValue[0] = 1;
+ when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(BytesSchema.of().encode(bytesValue)));
+ Assert.assertEquals(bytesValue, pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData()));
+
+ SchemaInfo schemaInfoDate = SchemaInfo.builder().type(SchemaType.DATE).build();
+ pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoDate);
+ Date dateValue = new Date(System.currentTimeMillis());
+ when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(DateSchema.of().encode(dateValue)));
+ Object dateDeserializeValue = pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData());
+ Assert.assertEquals(dateValue.getTime(), dateDeserializeValue);
+
+ SchemaInfo schemaInfoTime = SchemaInfo.builder().type(SchemaType.TIME).build();
+ pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoTime);
+ Time timeValue = new Time(System.currentTimeMillis());
+ when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(TimeSchema.of().encode(timeValue)));
+ Object timeDeserializeValue = pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData());
+ Assert.assertEquals(timeValue.getTime(), timeDeserializeValue);
+
+ SchemaInfo schemaInfoTimestamp = SchemaInfo.builder().type(SchemaType.TIMESTAMP).build();
+ pulsarPrimitiveSchemaHandler = new PulsarPrimitiveSchemaHandler(schemaInfoTimestamp);
+ Timestamp timestampValue = new Timestamp(System.currentTimeMillis());
+ when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(TimestampSchema.of().encode(timestampValue)));
+ Object timestampDeserializeValue = pulsarPrimitiveSchemaHandler.deserialize(rawMessage.getData());
+ Assert.assertEquals(timestampValue.getTime(), timestampDeserializeValue);
+ }
+
+ @Test
+ public void testNewPulsarPrimitiveSchemaHandler() {
+ RawMessage rawMessage = mock(RawMessage.class);
+ SchemaHandler schemaHandler = PulsarSchemaHandlers.newPulsarSchemaHandler(
+ StringSchema.utf8().getSchemaInfo(),
+ null);
+
+ String stringValue = "test";
+ when(rawMessage.getData()).thenReturn(ByteBufAllocator.DEFAULT.buffer().writeBytes(StringSchema.utf8().encode(stringValue)));
+
+ Object deserializeValue = schemaHandler.deserialize(rawMessage.getData());
+ Assert.assertEquals(stringValue, (String)deserializeValue);
+ Assert.assertEquals(stringValue, (String)deserializeValue);
+
+ }
+
+ @Test
+ public void testNewColumnMetadata() {
+ List<ColumnMetadata> columnMetadataList = PulsarMetadata.getPulsarColumns(stringTopicName,
+ StringSchema.utf8().getSchemaInfo(), false);
+ Assert.assertEquals(columnMetadataList.size(), 1);
+ ColumnMetadata columnMetadata = columnMetadataList.get(0);
+ Assert.assertEquals("__value__", columnMetadata.getName());
+ Assert.assertEquals("varchar", columnMetadata.getType().toString());
+ }
+}
\ No newline at end of file