You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/17 07:52:05 UTC

[incubator-pulsar] branch master updated: [schema] add schemas for primtive types (#2500)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a2d75c  [schema] add schemas for primtive types (#2500)
5a2d75c is described below

commit 5a2d75c11acd8c6014ea1f9551405a6bcb41e4f8
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Mon Sep 17 00:51:59 2018 -0700

    [schema] add schemas for primtive types (#2500)
    
    * [schema] add schemas for primtive types
    
    * Fix issues with Schema.BYTES
---
 .../client/api/SchemaSerializationException.java   |   5 +
 .../client/impl/schema/ByteBufferSchema.java       |  75 +++++++++++++++
 .../schema/{StringSchema.java => ByteSchema.java}  |  50 +++++-----
 .../pulsar/client/impl/schema/BytesSchema.java     |  17 +++-
 .../pulsar/client/impl/schema/DoubleSchema.java    |  80 ++++++++++++++++
 .../pulsar/client/impl/schema/FloatSchema.java     |  76 +++++++++++++++
 .../pulsar/client/impl/schema/IntSchema.java       |  75 +++++++++++++++
 .../pulsar/client/impl/schema/LongSchema.java      |  79 ++++++++++++++++
 .../pulsar/client/impl/schema/ShortSchema.java     |  73 +++++++++++++++
 .../pulsar/client/impl/schema/StringSchema.java    |  29 ++++--
 .../pulsar/client/schema/PrimitiveSchemaTest.java  | 102 +++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  11 ++-
 .../apache/pulsar/client/impl/ProducerImpl.java    |   3 +
 .../apache/pulsar/common/schema/SchemaInfo.java    |   2 +
 .../apache/pulsar/common/schema/SchemaType.java    |  40 ++++++++
 15 files changed, 686 insertions(+), 31 deletions(-)

diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
index 39248d2..46a38c1 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
@@ -19,6 +19,11 @@
 package org.apache.pulsar.client.api;
 
 public class SchemaSerializationException extends RuntimeException {
+
+    public SchemaSerializationException(String message) {
+        super(message);
+    }
+
     public SchemaSerializationException(Throwable cause) {
         super(cause);
     }
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
new file mode 100644
index 0000000..ee8ba66
--- /dev/null
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import java.nio.ByteBuffer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A bytebuffer schema.
+ */
+public class ByteBufferSchema implements Schema<ByteBuffer> {
+
+    public static ByteBufferSchema of() {
+        return INSTANCE;
+    }
+
+    private static final ByteBufferSchema INSTANCE = new ByteBufferSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("ByteBuffer")
+        .setType(SchemaType.BYTEBUFFER)
+        .setSchema(new byte[0]);
+
+    @Override
+    public byte[] encode(ByteBuffer data) {
+        if (data == null) {
+            return null;
+        }
+
+        data.rewind();
+
+        if (data.hasArray()) {
+            byte[] arr = data.array();
+            if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
+                return arr;
+            }
+        }
+
+        byte[] ret = new byte[data.remaining()];
+        data.get(ret, 0, ret.length);
+        data.rewind();
+        return ret;
+    }
+
+    @Override
+    public ByteBuffer decode(byte[] data) {
+        if (null == data) {
+            return null;
+        } else {
+            return ByteBuffer.wrap(data);
+        }
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return SCHEMA_INFO;
+    }
+}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
similarity index 53%
copy from pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
copy to pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
index 4eca216..da82216 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
@@ -19,40 +19,48 @@
 package org.apache.pulsar.client.impl.schema;
 
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-
 /**
- * Schema definition for Strings encoded in UTF-8 format.
+ * A schema for 'Byte'.
  */
-public class StringSchema implements Schema<String> {
-    private final Charset charset;
-    private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
+public class ByteSchema implements Schema<Byte> {
 
-    public StringSchema() {
-        this.charset = DEFAULT_CHARSET;
+    public static ByteSchema of() {
+        return INSTANCE;
     }
 
-    public StringSchema(Charset charset) {
-        this.charset = charset;
-    }
+    private static final ByteSchema INSTANCE = new ByteSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("INT8")
+        .setType(SchemaType.INT8)
+        .setSchema(new byte[0]);
+
 
-    public byte[] encode(String message) {
-        return message.getBytes(charset);
+    @Override
+    public byte[] encode(Byte message) {
+        if (null == message) {
+            return null;
+        } else {
+            return new byte[]{message};
+        }
     }
 
-    public String decode(byte[] bytes) {
-        return new String(bytes, charset);
+    @Override
+    public Byte decode(byte[] bytes) {
+        if (null == bytes) {
+            return null;
+        }
+        if (bytes.length != 1) {
+            throw new SchemaSerializationException("Size of data received by ByteSchema is not 1");
+        }
+        return bytes[0];
     }
 
+    @Override
     public SchemaInfo getSchemaInfo() {
-        SchemaInfo schemaInfo = new SchemaInfo();
-        schemaInfo.setName("String");
-        schemaInfo.setType(SchemaType.STRING);
-        schemaInfo.setSchema(new byte[0]);
-        return schemaInfo;
+        return SCHEMA_INFO;
     }
 }
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
index 042f04c..9a94fcd 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
@@ -20,8 +20,23 @@ package org.apache.pulsar.client.impl.schema;
 
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 
+/**
+ * A schema for bytes array.
+ */
 public class BytesSchema implements Schema<byte[]> {
+
+    public static BytesSchema of() {
+        return INSTANCE;
+    }
+
+    private static final BytesSchema INSTANCE = new BytesSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("Bytes")
+        .setType(SchemaType.BYTES)
+        .setSchema(new byte[0]);
+
     @Override
     public byte[] encode(byte[] message) {
         return message;
@@ -34,6 +49,6 @@ public class BytesSchema implements Schema<byte[]> {
 
     @Override
     public SchemaInfo getSchemaInfo() {
-        return null;
+        return SCHEMA_INFO;
     }
 }
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
new file mode 100644
index 0000000..8ffd9d3
--- /dev/null
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for `Double`.
+ */
+public class DoubleSchema implements Schema<Double> {
+
+    public static DoubleSchema of() {
+        return INSTANCE;
+    }
+
+    private static final DoubleSchema INSTANCE = new DoubleSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("Double")
+        .setType(SchemaType.DOUBLE)
+        .setSchema(new byte[0]);
+
+    @Override
+    public byte[] encode(Double message) {
+        if (null == message) {
+            return null;
+        } else {
+            long bits = Double.doubleToLongBits(message);
+            return new byte[] {
+                (byte) (bits >>> 56),
+                (byte) (bits >>> 48),
+                (byte) (bits >>> 40),
+                (byte) (bits >>> 32),
+                (byte) (bits >>> 24),
+                (byte) (bits >>> 16),
+                (byte) (bits >>> 8),
+                (byte) bits
+            };
+        }
+    }
+
+    @Override
+    public Double decode(byte[] bytes) {
+        if (null == bytes) {
+            return null;
+        }
+        if (bytes.length != 8) {
+            throw new SchemaSerializationException("Size of data received by DoubleSchema is not 8");
+        }
+        long value = 0;
+        for (byte b : bytes) {
+            value <<= 8;
+            value |= b & 0xFF;
+        }
+        return Double.longBitsToDouble(value);
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return SCHEMA_INFO;
+    }
+}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
new file mode 100644
index 0000000..b7c61fb
--- /dev/null
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for `Float`.
+ */
+public class FloatSchema implements Schema<Float> {
+
+    public static FloatSchema of() {
+        return INSTANCE;
+    }
+
+    private static final FloatSchema INSTANCE = new FloatSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("Float")
+        .setType(SchemaType.FLOAT)
+        .setSchema(new byte[0]);
+
+    @Override
+    public byte[] encode(Float message) {
+        if (null == message) {
+            return null;
+        } else {
+            long bits = Float.floatToRawIntBits(message);
+            return new byte[] {
+                (byte) (bits >>> 24),
+                (byte) (bits >>> 16),
+                (byte) (bits >>> 8),
+                (byte) bits
+            };
+        }
+    }
+
+    @Override
+    public Float decode(byte[] bytes) {
+        if (null == bytes) {
+            return null;
+        }
+        if (bytes.length != 4) {
+            throw new SchemaSerializationException("Size of data received by FloatSchema is not 4");
+        }
+        int value = 0;
+        for (byte b : bytes) {
+            value <<= 8;
+            value |= b & 0xFF;
+        }
+        return Float.intBitsToFloat(value);
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return SCHEMA_INFO;
+    }
+}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
new file mode 100644
index 0000000..33bd73b
--- /dev/null
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for `Integer`.
+ */
+public class IntSchema implements Schema<Integer> {
+
+    public static IntSchema of() {
+        return INSTANCE;
+    }
+
+    private static final IntSchema INSTANCE = new IntSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("INT32")
+        .setType(SchemaType.INT32)
+        .setSchema(new byte[0]);
+
+    @Override
+    public byte[] encode(Integer message) {
+        if (null == message) {
+            return null;
+        } else {
+            return new byte[] {
+                (byte) (message >>> 24),
+                (byte) (message >>> 16),
+                (byte) (message >>> 8),
+                message.byteValue()
+            };
+        }
+    }
+
+    @Override
+    public Integer decode(byte[] bytes) {
+        if (null == bytes) {
+            return null;
+        }
+        if (bytes.length != 4) {
+            throw new SchemaSerializationException("Size of data received by IntSchema is not 4");
+        }
+        int value = 0;
+        for (byte b : bytes) {
+            value <<= 8;
+            value |= b & 0xFF;
+        }
+        return value;
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return SCHEMA_INFO;
+    }
+}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
new file mode 100644
index 0000000..e82a901
--- /dev/null
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for `Long`.
+ */
+public class LongSchema implements Schema<Long> {
+
+    public static LongSchema of() {
+        return INSTANCE;
+    }
+
+    private static final LongSchema INSTANCE = new LongSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("INT64")
+        .setType(SchemaType.INT64)
+        .setSchema(new byte[0]);
+
+    @Override
+    public byte[] encode(Long data) {
+        if (null == data) {
+            return null;
+        } else {
+            return new byte[] {
+                (byte) (data >>> 56),
+                (byte) (data >>> 48),
+                (byte) (data >>> 40),
+                (byte) (data >>> 32),
+                (byte) (data >>> 24),
+                (byte) (data >>> 16),
+                (byte) (data >>> 8),
+                data.byteValue()
+            };
+        }
+    }
+
+    @Override
+    public Long decode(byte[] bytes) {
+        if (null == bytes) {
+            return null;
+        }
+        if (bytes.length != 8) {
+            throw new SchemaSerializationException("Size of data received by LongSchema is not 8");
+        }
+        long value = 0L;
+        for (byte b : bytes) {
+            value <<= 8;
+            value |= b & 0xFF;
+        }
+        return value;
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return SCHEMA_INFO;
+    }
+}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
new file mode 100644
index 0000000..fc73b89
--- /dev/null
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for `Short`.
+ */
+public class ShortSchema implements Schema<Short> {
+
+    public static ShortSchema of() {
+        return INSTANCE;
+    }
+
+    private static final ShortSchema INSTANCE = new ShortSchema();
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("INT16")
+        .setType(SchemaType.INT16)
+        .setSchema(new byte[0]);
+
+    @Override
+    public byte[] encode(Short message) {
+        if (null == message) {
+            return null;
+        } else {
+            return new byte[] {
+                (byte) (message >>> 8),
+                message.byteValue()
+            };
+        }
+    }
+
+    @Override
+    public Short decode(byte[] bytes) {
+        if (null == bytes) {
+            return null;
+        }
+        if (bytes.length != 2) {
+            throw new SchemaSerializationException("Size of data received by ShortSchema is not 2");
+        }
+        short value = 0;
+        for (byte b : bytes) {
+            value <<= 8;
+            value |= b & 0xFF;
+        }
+        return value;
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return SCHEMA_INFO;
+    }
+}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
index 4eca216..11b5c5f 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
@@ -29,6 +29,17 @@ import java.nio.charset.StandardCharsets;
  * Schema definition for Strings encoded in UTF-8 format.
  */
 public class StringSchema implements Schema<String> {
+
+    public static StringSchema utf8() {
+        return UTF8;
+    }
+
+    private static final StringSchema UTF8 = new StringSchema(StandardCharsets.UTF_8);
+    private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+        .setName("String")
+        .setType(SchemaType.STRING)
+        .setSchema(new byte[0]);
+
     private final Charset charset;
     private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
 
@@ -41,18 +52,22 @@ public class StringSchema implements Schema<String> {
     }
 
     public byte[] encode(String message) {
-        return message.getBytes(charset);
+        if (null == message) {
+            return null;
+        } else {
+            return message.getBytes(charset);
+        }
     }
 
     public String decode(byte[] bytes) {
-        return new String(bytes, charset);
+        if (null == bytes) {
+            return null;
+        } else {
+            return new String(bytes, charset);
+        }
     }
 
     public SchemaInfo getSchemaInfo() {
-        SchemaInfo schemaInfo = new SchemaInfo();
-        schemaInfo.setName("String");
-        schemaInfo.setType(SchemaType.STRING);
-        schemaInfo.setSchema(new byte[0]);
-        return schemaInfo;
+        return SCHEMA_INFO;
     }
 }
diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java
new file mode 100644
index 0000000..fdac539
--- /dev/null
+++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.schema;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.ByteBufferSchema;
+import org.apache.pulsar.client.impl.schema.ByteSchema;
+import org.apache.pulsar.client.impl.schema.BytesSchema;
+import org.apache.pulsar.client.impl.schema.DoubleSchema;
+import org.apache.pulsar.client.impl.schema.FloatSchema;
+import org.apache.pulsar.client.impl.schema.IntSchema;
+import org.apache.pulsar.client.impl.schema.LongSchema;
+import org.apache.pulsar.client.impl.schema.ShortSchema;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests primitive schemas.
+ */
+@Slf4j
+public class PrimitiveSchemaTest {
+
+    final private Map<Schema, List<Object>> testData = new HashMap() {
+        {
+            put(StringSchema.utf8(), Arrays.asList("my string"));
+            put(ByteSchema.of(), Arrays.asList((byte) 32767, (byte) -32768));
+            put(ShortSchema.of(), Arrays.asList((short) 32767, (short) -32768));
+            put(IntSchema.of(), Arrays.asList((int) 423412424, (int) -41243432));
+            put(LongSchema.of(), Arrays.asList(922337203685477580L, -922337203685477581L));
+            put(FloatSchema.of(), Arrays.asList(5678567.12312f, -5678567.12341f));
+            put(DoubleSchema.of(), Arrays.asList(5678567.12312d, -5678567.12341d));
+            put(BytesSchema.of(), Arrays.asList("my string".getBytes()));
+            put(ByteBufferSchema.of(), Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes())));
+        }
+    };
+
+    @Test
+    public void allSchemasShouldSupportNull() {
+        for (Schema<?> schema : testData.keySet()) {
+            assertNull(schema.encode(null),
+                "Should support null in " + schema.getSchemaInfo().getName() + " serialization");
+            assertNull(schema.decode( null),
+                "Should support null in " + schema.getSchemaInfo().getName() + " deserialization");
+        }
+    }
+
+    @Test
+    public void allSchemasShouldRoundtripInput() {
+        for (Map.Entry<Schema, List<Object>> test : testData.entrySet()) {
+            log.info("Test schema {}", test.getKey());
+            for (Object value : test.getValue()) {
+                log.info("Encode : {}", value);
+                assertEquals(value,
+                    test.getKey().decode(test.getKey().encode(value)),
+                    "Should get the original " + test.getKey().getSchemaInfo().getName() +
+                        " after serialization and deserialization");
+            }
+        }
+    }
+
+    @Test
+    public void allSchemasShouldHaveSchemaType() {
+        assertEquals(SchemaType.INT8, ByteSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.INT16, ShortSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.INT32, IntSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.INT64, LongSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.FLOAT, FloatSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.DOUBLE, DoubleSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.STRING, StringSchema.utf8().getSchemaInfo().getType());
+        assertEquals(SchemaType.BYTES, BytesSchema.of().getSchemaInfo().getType());
+        assertEquals(SchemaType.BYTEBUFFER, ByteBufferSchema.of().getSchemaInfo().getType());
+
+    }
+
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b7f9918..f1db613 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -66,6 +66,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.common.api.PulsarDecoder;
@@ -82,7 +83,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -535,8 +537,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             builder.recycle();
         }
 
+        SchemaInfo si = schema.getSchemaInfo();
+        if (SchemaType.BYTES == si.getType()) {
+            // don't set schema for Schema.BYTES
+            si = null;
+        }
         ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
-                consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()), schema.getSchemaInfo());
+                consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()), si);
         if (startMessageIdData != null) {
             startMessageIdData.recycle();
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 9401e7c..8551510 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -900,6 +900,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                         JSONSchema jsonSchema = (JSONSchema) schema;
                         schemaInfo = jsonSchema.getBackwardsCompatibleJsonSchemaInfo();
                     }
+                } else if (schema.getSchemaInfo().getType() == SchemaType.BYTES) {
+                    // don't set schema info for Schema.BYTES
+                    schemaInfo = null;
                 } else {
                     schemaInfo = schema.getSchemaInfo();
                 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index 9721444..279b628 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -27,6 +27,7 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 
+import lombok.experimental.Accessors;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.Schema;
@@ -34,6 +35,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.Schema;
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
+@Accessors(chain = true)
 public class SchemaInfo {
 
     @EqualsAndHashCode.Exclude
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index 88adb53..87c0956 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -33,6 +33,46 @@ public enum SchemaType {
     STRING,
 
     /**
+     * A 8-byte integer.
+     */
+    INT8,
+
+    /**
+     * A 16-byte integer.
+     */
+    INT16,
+
+    /**
+     * A 32-byte integer.
+     */
+    INT32,
+
+    /**
+     * A 64-byte integer.
+     */
+    INT64,
+
+    /**
+     * A float number.
+     */
+    FLOAT,
+
+    /**
+     * A double number
+     */
+    DOUBLE,
+
+    /**
+     * A bytes array.
+     */
+    BYTES,
+
+    /**
+     * A bytebuffer.
+     */
+    BYTEBUFFER,
+
+    /**
      * JSON object encoding and validation
      */
     JSON,