You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/09/17 07:52:05 UTC
[incubator-pulsar] branch master updated: [schema] add schemas for
primtive types (#2500)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5a2d75c [schema] add schemas for primtive types (#2500)
5a2d75c is described below
commit 5a2d75c11acd8c6014ea1f9551405a6bcb41e4f8
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Mon Sep 17 00:51:59 2018 -0700
[schema] add schemas for primtive types (#2500)
* [schema] add schemas for primtive types
* Fix issues with Schema.BYTES
---
.../client/api/SchemaSerializationException.java | 5 +
.../client/impl/schema/ByteBufferSchema.java | 75 +++++++++++++++
.../schema/{StringSchema.java => ByteSchema.java} | 50 +++++-----
.../pulsar/client/impl/schema/BytesSchema.java | 17 +++-
.../pulsar/client/impl/schema/DoubleSchema.java | 80 ++++++++++++++++
.../pulsar/client/impl/schema/FloatSchema.java | 76 +++++++++++++++
.../pulsar/client/impl/schema/IntSchema.java | 75 +++++++++++++++
.../pulsar/client/impl/schema/LongSchema.java | 79 ++++++++++++++++
.../pulsar/client/impl/schema/ShortSchema.java | 73 +++++++++++++++
.../pulsar/client/impl/schema/StringSchema.java | 29 ++++--
.../pulsar/client/schema/PrimitiveSchemaTest.java | 102 +++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 11 ++-
.../apache/pulsar/client/impl/ProducerImpl.java | 3 +
.../apache/pulsar/common/schema/SchemaInfo.java | 2 +
.../apache/pulsar/common/schema/SchemaType.java | 40 ++++++++
15 files changed, 686 insertions(+), 31 deletions(-)
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
index 39248d2..46a38c1 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/api/SchemaSerializationException.java
@@ -19,6 +19,11 @@
package org.apache.pulsar.client.api;
public class SchemaSerializationException extends RuntimeException {
+
+ public SchemaSerializationException(String message) {
+ super(message);
+ }
+
public SchemaSerializationException(Throwable cause) {
super(cause);
}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
new file mode 100644
index 0000000..ee8ba66
--- /dev/null
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import java.nio.ByteBuffer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A bytebuffer schema.
+ */
+public class ByteBufferSchema implements Schema<ByteBuffer> {
+
+ public static ByteBufferSchema of() {
+ return INSTANCE;
+ }
+
+ private static final ByteBufferSchema INSTANCE = new ByteBufferSchema();
+ private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+ .setName("ByteBuffer")
+ .setType(SchemaType.BYTEBUFFER)
+ .setSchema(new byte[0]);
+
+ @Override
+ public byte[] encode(ByteBuffer data) {
+ if (data == null) {
+ return null;
+ }
+
+ data.rewind();
+
+ if (data.hasArray()) {
+ byte[] arr = data.array();
+ if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
+ return arr;
+ }
+ }
+
+ byte[] ret = new byte[data.remaining()];
+ data.get(ret, 0, ret.length);
+ data.rewind();
+ return ret;
+ }
+
+ @Override
+ public ByteBuffer decode(byte[] data) {
+ if (null == data) {
+ return null;
+ } else {
+ return ByteBuffer.wrap(data);
+ }
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return SCHEMA_INFO;
+ }
+}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
similarity index 53%
copy from pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
copy to pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
index 4eca216..da82216 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
@@ -19,40 +19,48 @@
package org.apache.pulsar.client.impl.schema;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-
/**
- * Schema definition for Strings encoded in UTF-8 format.
+ * A schema for 'Byte'.
*/
-public class StringSchema implements Schema<String> {
- private final Charset charset;
- private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
+public class ByteSchema implements Schema<Byte> {
- public StringSchema() {
- this.charset = DEFAULT_CHARSET;
+ public static ByteSchema of() {
+ return INSTANCE;
}
- public StringSchema(Charset charset) {
- this.charset = charset;
- }
+ private static final ByteSchema INSTANCE = new ByteSchema();
+ private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+ .setName("INT8")
+ .setType(SchemaType.INT8)
+ .setSchema(new byte[0]);
+
- public byte[] encode(String message) {
- return message.getBytes(charset);
+ @Override
+ public byte[] encode(Byte message) {
+ if (null == message) {
+ return null;
+ } else {
+ return new byte[]{message};
+ }
}
- public String decode(byte[] bytes) {
- return new String(bytes, charset);
+ @Override
+ public Byte decode(byte[] bytes) {
+ if (null == bytes) {
+ return null;
+ }
+ if (bytes.length != 1) {
+ throw new SchemaSerializationException("Size of data received by ByteSchema is not 1");
+ }
+ return bytes[0];
}
+ @Override
public SchemaInfo getSchemaInfo() {
- SchemaInfo schemaInfo = new SchemaInfo();
- schemaInfo.setName("String");
- schemaInfo.setType(SchemaType.STRING);
- schemaInfo.setSchema(new byte[0]);
- return schemaInfo;
+ return SCHEMA_INFO;
}
}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
index 042f04c..9a94fcd 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
@@ -20,8 +20,23 @@ package org.apache.pulsar.client.impl.schema;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+/**
+ * A schema for bytes array.
+ */
public class BytesSchema implements Schema<byte[]> {
+
+ public static BytesSchema of() {
+ return INSTANCE;
+ }
+
+ private static final BytesSchema INSTANCE = new BytesSchema();
+ private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+ .setName("Bytes")
+ .setType(SchemaType.BYTES)
+ .setSchema(new byte[0]);
+
@Override
public byte[] encode(byte[] message) {
return message;
@@ -34,6 +49,6 @@ public class BytesSchema implements Schema<byte[]> {
@Override
public SchemaInfo getSchemaInfo() {
- return null;
+ return SCHEMA_INFO;
}
}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
new file mode 100644
index 0000000..8ffd9d3
--- /dev/null
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for `Double`.
+ */
+public class DoubleSchema implements Schema<Double> {
+
+ public static DoubleSchema of() {
+ return INSTANCE;
+ }
+
+ private static final DoubleSchema INSTANCE = new DoubleSchema();
+ private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+ .setName("Double")
+ .setType(SchemaType.DOUBLE)
+ .setSchema(new byte[0]);
+
+ @Override
+ public byte[] encode(Double message) {
+ if (null == message) {
+ return null;
+ } else {
+ long bits = Double.doubleToLongBits(message);
+ return new byte[] {
+ (byte) (bits >>> 56),
+ (byte) (bits >>> 48),
+ (byte) (bits >>> 40),
+ (byte) (bits >>> 32),
+ (byte) (bits >>> 24),
+ (byte) (bits >>> 16),
+ (byte) (bits >>> 8),
+ (byte) bits
+ };
+ }
+ }
+
+ @Override
+ public Double decode(byte[] bytes) {
+ if (null == bytes) {
+ return null;
+ }
+ if (bytes.length != 8) {
+ throw new SchemaSerializationException("Size of data received by DoubleSchema is not 8");
+ }
+ long value = 0;
+ for (byte b : bytes) {
+ value <<= 8;
+ value |= b & 0xFF;
+ }
+ return Double.longBitsToDouble(value);
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return SCHEMA_INFO;
+ }
+}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
new file mode 100644
index 0000000..b7c61fb
--- /dev/null
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for `Float`.
+ */
+public class FloatSchema implements Schema<Float> {
+
+ public static FloatSchema of() {
+ return INSTANCE;
+ }
+
+ private static final FloatSchema INSTANCE = new FloatSchema();
+ private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+ .setName("Float")
+ .setType(SchemaType.FLOAT)
+ .setSchema(new byte[0]);
+
+ @Override
+ public byte[] encode(Float message) {
+ if (null == message) {
+ return null;
+ } else {
+ long bits = Float.floatToRawIntBits(message);
+ return new byte[] {
+ (byte) (bits >>> 24),
+ (byte) (bits >>> 16),
+ (byte) (bits >>> 8),
+ (byte) bits
+ };
+ }
+ }
+
+ @Override
+ public Float decode(byte[] bytes) {
+ if (null == bytes) {
+ return null;
+ }
+ if (bytes.length != 4) {
+ throw new SchemaSerializationException("Size of data received by FloatSchema is not 4");
+ }
+ int value = 0;
+ for (byte b : bytes) {
+ value <<= 8;
+ value |= b & 0xFF;
+ }
+ return Float.intBitsToFloat(value);
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return SCHEMA_INFO;
+ }
+}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
new file mode 100644
index 0000000..33bd73b
--- /dev/null
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for `Integer`.
+ */
+public class IntSchema implements Schema<Integer> {
+
+ public static IntSchema of() {
+ return INSTANCE;
+ }
+
+ private static final IntSchema INSTANCE = new IntSchema();
+ private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+ .setName("INT32")
+ .setType(SchemaType.INT32)
+ .setSchema(new byte[0]);
+
+ @Override
+ public byte[] encode(Integer message) {
+ if (null == message) {
+ return null;
+ } else {
+ return new byte[] {
+ (byte) (message >>> 24),
+ (byte) (message >>> 16),
+ (byte) (message >>> 8),
+ message.byteValue()
+ };
+ }
+ }
+
+ @Override
+ public Integer decode(byte[] bytes) {
+ if (null == bytes) {
+ return null;
+ }
+ if (bytes.length != 4) {
+ throw new SchemaSerializationException("Size of data received by IntSchema is not 4");
+ }
+ int value = 0;
+ for (byte b : bytes) {
+ value <<= 8;
+ value |= b & 0xFF;
+ }
+ return value;
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return SCHEMA_INFO;
+ }
+}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
new file mode 100644
index 0000000..e82a901
--- /dev/null
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for `Long`.
+ */
+public class LongSchema implements Schema<Long> {
+
+ public static LongSchema of() {
+ return INSTANCE;
+ }
+
+ private static final LongSchema INSTANCE = new LongSchema();
+ private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+ .setName("INT64")
+ .setType(SchemaType.INT64)
+ .setSchema(new byte[0]);
+
+ @Override
+ public byte[] encode(Long data) {
+ if (null == data) {
+ return null;
+ } else {
+ return new byte[] {
+ (byte) (data >>> 56),
+ (byte) (data >>> 48),
+ (byte) (data >>> 40),
+ (byte) (data >>> 32),
+ (byte) (data >>> 24),
+ (byte) (data >>> 16),
+ (byte) (data >>> 8),
+ data.byteValue()
+ };
+ }
+ }
+
+ @Override
+ public Long decode(byte[] bytes) {
+ if (null == bytes) {
+ return null;
+ }
+ if (bytes.length != 8) {
+ throw new SchemaSerializationException("Size of data received by LongSchema is not 8");
+ }
+ long value = 0L;
+ for (byte b : bytes) {
+ value <<= 8;
+ value |= b & 0xFF;
+ }
+ return value;
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return SCHEMA_INFO;
+ }
+}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
new file mode 100644
index 0000000..fc73b89
--- /dev/null
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * A schema for `Short`.
+ */
+public class ShortSchema implements Schema<Short> {
+
+ public static ShortSchema of() {
+ return INSTANCE;
+ }
+
+ private static final ShortSchema INSTANCE = new ShortSchema();
+ private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+ .setName("INT16")
+ .setType(SchemaType.INT16)
+ .setSchema(new byte[0]);
+
+ @Override
+ public byte[] encode(Short message) {
+ if (null == message) {
+ return null;
+ } else {
+ return new byte[] {
+ (byte) (message >>> 8),
+ message.byteValue()
+ };
+ }
+ }
+
+ @Override
+ public Short decode(byte[] bytes) {
+ if (null == bytes) {
+ return null;
+ }
+ if (bytes.length != 2) {
+ throw new SchemaSerializationException("Size of data received by ShortSchema is not 2");
+ }
+ short value = 0;
+ for (byte b : bytes) {
+ value <<= 8;
+ value |= b & 0xFF;
+ }
+ return value;
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return SCHEMA_INFO;
+ }
+}
diff --git a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
index 4eca216..11b5c5f 100644
--- a/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
+++ b/pulsar-client-schema/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
@@ -29,6 +29,17 @@ import java.nio.charset.StandardCharsets;
* Schema definition for Strings encoded in UTF-8 format.
*/
public class StringSchema implements Schema<String> {
+
+ public static StringSchema utf8() {
+ return UTF8;
+ }
+
+ private static final StringSchema UTF8 = new StringSchema(StandardCharsets.UTF_8);
+ private static final SchemaInfo SCHEMA_INFO = new SchemaInfo()
+ .setName("String")
+ .setType(SchemaType.STRING)
+ .setSchema(new byte[0]);
+
private final Charset charset;
private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
@@ -41,18 +52,22 @@ public class StringSchema implements Schema<String> {
}
public byte[] encode(String message) {
- return message.getBytes(charset);
+ if (null == message) {
+ return null;
+ } else {
+ return message.getBytes(charset);
+ }
}
public String decode(byte[] bytes) {
- return new String(bytes, charset);
+ if (null == bytes) {
+ return null;
+ } else {
+ return new String(bytes, charset);
+ }
}
public SchemaInfo getSchemaInfo() {
- SchemaInfo schemaInfo = new SchemaInfo();
- schemaInfo.setName("String");
- schemaInfo.setType(SchemaType.STRING);
- schemaInfo.setSchema(new byte[0]);
- return schemaInfo;
+ return SCHEMA_INFO;
}
}
diff --git a/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java
new file mode 100644
index 0000000..fdac539
--- /dev/null
+++ b/pulsar-client-schema/src/test/java/org/apache/pulsar/client/schema/PrimitiveSchemaTest.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.schema;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.ByteBufferSchema;
+import org.apache.pulsar.client.impl.schema.ByteSchema;
+import org.apache.pulsar.client.impl.schema.BytesSchema;
+import org.apache.pulsar.client.impl.schema.DoubleSchema;
+import org.apache.pulsar.client.impl.schema.FloatSchema;
+import org.apache.pulsar.client.impl.schema.IntSchema;
+import org.apache.pulsar.client.impl.schema.LongSchema;
+import org.apache.pulsar.client.impl.schema.ShortSchema;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests primitive schemas.
+ */
+@Slf4j
+public class PrimitiveSchemaTest {
+
+ final private Map<Schema, List<Object>> testData = new HashMap() {
+ {
+ put(StringSchema.utf8(), Arrays.asList("my string"));
+ put(ByteSchema.of(), Arrays.asList((byte) 32767, (byte) -32768));
+ put(ShortSchema.of(), Arrays.asList((short) 32767, (short) -32768));
+ put(IntSchema.of(), Arrays.asList((int) 423412424, (int) -41243432));
+ put(LongSchema.of(), Arrays.asList(922337203685477580L, -922337203685477581L));
+ put(FloatSchema.of(), Arrays.asList(5678567.12312f, -5678567.12341f));
+ put(DoubleSchema.of(), Arrays.asList(5678567.12312d, -5678567.12341d));
+ put(BytesSchema.of(), Arrays.asList("my string".getBytes()));
+ put(ByteBufferSchema.of(), Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes())));
+ }
+ };
+
+ @Test
+ public void allSchemasShouldSupportNull() {
+ for (Schema<?> schema : testData.keySet()) {
+ assertNull(schema.encode(null),
+ "Should support null in " + schema.getSchemaInfo().getName() + " serialization");
+ assertNull(schema.decode( null),
+ "Should support null in " + schema.getSchemaInfo().getName() + " deserialization");
+ }
+ }
+
+ @Test
+ public void allSchemasShouldRoundtripInput() {
+ for (Map.Entry<Schema, List<Object>> test : testData.entrySet()) {
+ log.info("Test schema {}", test.getKey());
+ for (Object value : test.getValue()) {
+ log.info("Encode : {}", value);
+ assertEquals(value,
+ test.getKey().decode(test.getKey().encode(value)),
+ "Should get the original " + test.getKey().getSchemaInfo().getName() +
+ " after serialization and deserialization");
+ }
+ }
+ }
+
+ @Test
+ public void allSchemasShouldHaveSchemaType() {
+ assertEquals(SchemaType.INT8, ByteSchema.of().getSchemaInfo().getType());
+ assertEquals(SchemaType.INT16, ShortSchema.of().getSchemaInfo().getType());
+ assertEquals(SchemaType.INT32, IntSchema.of().getSchemaInfo().getType());
+ assertEquals(SchemaType.INT64, LongSchema.of().getSchemaInfo().getType());
+ assertEquals(SchemaType.FLOAT, FloatSchema.of().getSchemaInfo().getType());
+ assertEquals(SchemaType.DOUBLE, DoubleSchema.of().getSchemaInfo().getType());
+ assertEquals(SchemaType.STRING, StringSchema.utf8().getSchemaInfo().getType());
+ assertEquals(SchemaType.BYTES, BytesSchema.of().getSchemaInfo().getType());
+ assertEquals(SchemaType.BYTEBUFFER, ByteBufferSchema.of().getSchemaInfo().getType());
+
+ }
+
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b7f9918..f1db613 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -66,6 +66,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.PulsarDecoder;
@@ -82,7 +83,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.client.api.DeadLetterPolicy;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -535,8 +537,13 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
builder.recycle();
}
+ SchemaInfo si = schema.getSchemaInfo();
+ if (SchemaType.BYTES == si.getType()) {
+ // don't set schema for Schema.BYTES
+ si = null;
+ }
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
- consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()), schema.getSchemaInfo());
+ consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()), si);
if (startMessageIdData != null) {
startMessageIdData.recycle();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 9401e7c..8551510 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -900,6 +900,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
JSONSchema jsonSchema = (JSONSchema) schema;
schemaInfo = jsonSchema.getBackwardsCompatibleJsonSchemaInfo();
}
+ } else if (schema.getSchemaInfo().getType() == SchemaType.BYTES) {
+ // don't set schema info for Schema.BYTES
+ schemaInfo = null;
} else {
schemaInfo = schema.getSchemaInfo();
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index 9721444..279b628 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -27,6 +27,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
+import lombok.experimental.Accessors;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.Schema;
@@ -34,6 +35,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.Schema;
@Data
@AllArgsConstructor
@NoArgsConstructor
+@Accessors(chain = true)
public class SchemaInfo {
@EqualsAndHashCode.Exclude
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
index 88adb53..87c0956 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -33,6 +33,46 @@ public enum SchemaType {
STRING,
/**
+ * A 8-byte integer.
+ */
+ INT8,
+
+ /**
+ * A 16-byte integer.
+ */
+ INT16,
+
+ /**
+ * A 32-byte integer.
+ */
+ INT32,
+
+ /**
+ * A 64-byte integer.
+ */
+ INT64,
+
+ /**
+ * A float number.
+ */
+ FLOAT,
+
+ /**
+ * A double number
+ */
+ DOUBLE,
+
+ /**
+ * A bytes array.
+ */
+ BYTES,
+
+ /**
+ * A bytebuffer.
+ */
+ BYTEBUFFER,
+
+ /**
* JSON object encoding and validation
*/
JSON,