You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/06/20 05:30:46 UTC
[pulsar] branch master updated: [schema] key/value schema
enhancement (#4548)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 82d9e71 [schema] key/value schema enhancement (#4548)
82d9e71 is described below
commit 82d9e7160734f0e3b666085b00946e655d4eccf8
Author: Sijie Guo <si...@apache.org>
AuthorDate: Wed Jun 19 22:30:40 2019 -0700
[schema] key/value schema enhancement (#4548)
*Motivation*
- The code for encoding and decoding key/value schema is spreading over multiple places.
- Make code changes to prepare supporting key/value schema in AUTO consumers
- Make schema tools display key/value schema in a pretty format
*Modifications*
- Move the common logic of encoding and decoding key/value schema to a common class KeyValueSchemaInfo
- Expose the common class in DefaultImplementation so that it can be available for public usage
- Fix the display problem on displaying key/value schema
*Verify this change*
- Add bunch of the unit tests for key/value schemas
---
.../schema/KeyValueSchemaCompatibilityCheck.java | 57 +----
.../KeyValueSchemaCompatibilityCheckTest.java | 8 -
.../pulsar/client/api/schema/SchemaWriter.java | 4 +
.../client/internal/DefaultImplementation.java | 91 ++++++-
.../pulsar/client/internal/ReflectionUtils.java | 10 +
.../org/apache/pulsar/common/schema/KeyValue.java | 85 +++++++
.../apache/pulsar/common/schema/SchemaInfo.java | 11 +
.../org/apache/pulsar/admin/cli/CmdSchemas.java | 24 +-
.../pulsar/client/impl/schema/KeyValueSchema.java | 102 ++------
.../client/impl/schema/KeyValueSchemaInfo.java | 221 ++++++++++++++++
.../pulsar/client/impl/schema/SchemaUtils.java | 157 ++++++++++++
.../client/impl/schema/KeyValueSchemaInfoTest.java | 210 ++++++++++++++++
.../pulsar/client/impl/schema/SchemaInfoTest.java | 279 +++++++++++++++++++++
.../apache/pulsar/common/schema/KeyValueTest.java | 122 +++++++++
.../pulsar/common/protocol/schema/SchemaData.java | 30 +++
.../pulsar/tests/integration/cli/CLITest.java | 2 +-
16 files changed, 1247 insertions(+), 166 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
index b7cf5b4..2426f3e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheck.java
@@ -18,12 +18,12 @@
*/
package org.apache.pulsar.broker.service.schema;
-import com.google.gson.Gson;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
-import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
@@ -39,48 +39,13 @@ public class KeyValueSchemaCompatibilityCheck implements SchemaCompatibilityChec
this.checkers = checkers;
}
- private KeyValue<byte[], byte[]> splitKeyValueSchema(byte[] bytes) {
- ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
- int keyLength = byteBuffer.getInt();
- byte[] keySchema = new byte[keyLength];
- byteBuffer.get(keySchema);
-
- int valueLength = byteBuffer.getInt();
- byte[] valueSchema = new byte[valueLength];
- byteBuffer.get(valueSchema);
- return new KeyValue<>(keySchema, valueSchema);
- }
-
- private SchemaType fetchSchemaType(Map<String, String> properties, String key) {
- if (properties.get(key) != null) {
- return SchemaType.valueOf(properties.get(key));
- }
- return SchemaType.BYTES;
- }
-
- private SchemaData fetchSchemaData(
- byte[] keyValue, SchemaType schemaType, Gson schemaGson, Map<String, String> properties, String key) {
- if (properties.get(key) != null) {
- return SchemaData.builder().data(keyValue)
- .type(schemaType)
- .props(schemaGson.fromJson(properties.get(key), Map.class)).build();
- }
- return SchemaData.builder().data(keyValue)
- .type(schemaType)
- .props(Collections.emptyMap()).build();
- }
-
- private KeyValue<SchemaData, SchemaData> splitKeyValueSchemaData(SchemaData schemaData) {
- KeyValue<byte[], byte[]> keyValue = this.splitKeyValueSchema(schemaData.getData());
- Map<String, String> properties = schemaData.getProps();
- SchemaType keyType = fetchSchemaType(properties, "key.schema.type");
- SchemaType valueType = fetchSchemaType(properties, "value.schema.type");
- Gson schemaGson = new Gson();
- SchemaData keySchemaData = fetchSchemaData(
- keyValue.getKey(), keyType, schemaGson, properties, "key.schema.properties");
- SchemaData valueSchemaData = fetchSchemaData(
- keyValue.getValue(), valueType, schemaGson, properties, "value.schema.properties");
- return new KeyValue<>(keySchemaData, valueSchemaData);
+ private KeyValue<SchemaData, SchemaData> decodeKeyValueSchemaData(SchemaData schemaData) {
+ KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
+ KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaData.toSchemaInfo());
+ return new KeyValue<>(
+ SchemaData.fromSchemaInfo(schemaInfoKeyValue.getKey()),
+ SchemaData.fromSchemaInfo(schemaInfoKeyValue.getValue())
+ );
}
@Override
@@ -104,7 +69,7 @@ public class KeyValueSchemaCompatibilityCheck implements SchemaCompatibilityChec
LinkedList<SchemaData> fromKeyList = new LinkedList<>();
LinkedList<SchemaData> fromValueList = new LinkedList<>();
KeyValue<SchemaData, SchemaData> fromKeyValue;
- KeyValue<SchemaData, SchemaData> toKeyValue = splitKeyValueSchemaData(to);
+ KeyValue<SchemaData, SchemaData> toKeyValue = decodeKeyValueSchemaData(to);
SchemaType toKeyType = toKeyValue.getKey().getType();
SchemaType toValueType = toKeyValue.getValue().getType();
@@ -112,7 +77,7 @@ public class KeyValueSchemaCompatibilityCheck implements SchemaCompatibilityChec
if (schemaData.getType() != SchemaType.KEY_VALUE) {
return false;
}
- fromKeyValue = splitKeyValueSchemaData(schemaData);
+ fromKeyValue = decodeKeyValueSchemaData(schemaData);
if (fromKeyValue.getKey().getType() != toKeyType || fromKeyValue.getValue().getType() != toValueType) {
return false;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java
index b3d4716..0755781 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java
@@ -441,13 +441,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
Map<String, String> fromProperties = Maps.newHashMap();
fromProperties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
fromProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
- fromProperties.put("key.schema.properties", null);
- fromProperties.put("value.schema.properties", null);
Map<String, String> toProperties = Maps.newHashMap();
toProperties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
toProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
- toProperties.put("key.schema.properties", null);
- toProperties.put("value.schema.properties", null);
SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
.data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
@@ -460,11 +456,7 @@ public class KeyValueSchemaCompatibilityCheckTest {
AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
Map<String, String> fromProperties = Maps.newHashMap();
- fromProperties.put("key.schema.type", null);
- fromProperties.put("value.schema.type", null);
Map<String, String> toProperties = Maps.newHashMap();
- toProperties.put("key.schema.type", null);
- toProperties.put("value.schema.type", null);
SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
.data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaWriter.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaWriter.java
index a93739a..65576d3 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaWriter.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaWriter.java
@@ -18,6 +18,10 @@
*/
package org.apache.pulsar.client.api.schema;
+/**
+ * Serialize messages into bytes.
+ */
+@FunctionalInterface
public interface SchemaWriter<T> {
/**
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index 80282bf..c8418b9 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -41,7 +41,11 @@ import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
-import org.apache.pulsar.client.api.schema.*;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.api.schema.SchemaDefinitionBuilder;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -284,6 +288,91 @@ public class DefaultImplementation {
String.class).newInstance(name));
}
+ /**
+ * Decode the kv encoding type from the schema info.
+ *
+ * @param schemaInfo the schema info
+ * @return the kv encoding type
+ */
+ public static KeyValueEncodingType decodeKeyValueEncodingType(SchemaInfo schemaInfo) {
+ return catchExceptions(
+ () -> (KeyValueEncodingType) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo",
+ "decodeKeyValueEncodingType", SchemaInfo.class
+ ).invoke(null, schemaInfo));
+ }
+
+ /**
+ * Encode key & value into schema into a KeyValue schema.
+ *
+ * @param keySchema the key schema
+ * @param valueSchema the value schema
+ * @param keyValueEncodingType the encoding type to encode and decode key value pair
+ * @return the final schema info
+ */
+ public static <K, V> SchemaInfo encodeKeyValueSchemaInfo(Schema<K> keySchema,
+ Schema<V> valueSchema,
+ KeyValueEncodingType keyValueEncodingType) {
+ return encodeKeyValueSchemaInfo("KeyValue", keySchema, valueSchema, keyValueEncodingType);
+ }
+
+ /**
+ * Encode key & value into schema into a KeyValue schema.
+ *
+ * @param schemaName the final schema name
+ * @param keySchema the key schema
+ * @param valueSchema the value schema
+ * @param keyValueEncodingType the encoding type to encode and decode key value pair
+ * @return the final schema info
+ */
+ public static <K, V> SchemaInfo encodeKeyValueSchemaInfo(String schemaName,
+ Schema<K> keySchema,
+ Schema<V> valueSchema,
+ KeyValueEncodingType keyValueEncodingType) {
+ return catchExceptions(
+ () -> (SchemaInfo) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo",
+ "encodeKeyValueSchemaInfo", String.class, Schema.class, Schema.class, KeyValueEncodingType.class
+ ).invoke(null, schemaName, keySchema, valueSchema, keyValueEncodingType));
+ }
+
+ /**
+ * Decode the key/value schema info to get key schema info and value schema info.
+ *
+ * @param schemaInfo key/value schema info.
+ * @return the pair of key schema info and value schema info
+ */
+ public static KeyValue<SchemaInfo, SchemaInfo> decodeKeyValueSchemaInfo(SchemaInfo schemaInfo) {
+ return catchExceptions(
+ () -> (KeyValue<SchemaInfo, SchemaInfo>) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo",
+ "decodeKeyValueSchemaInfo", SchemaInfo.class
+ ).invoke(null, schemaInfo));
+ }
+
+ /**
+ * Jsonify the schema info.
+ *
+ * @param schemaInfo the schema info
+ * @return the jsonified schema info
+ */
+ public static String jsonifySchemaInfo(SchemaInfo schemaInfo) {
+ return catchExceptions(
+ () -> (String) getStaticMethod("org.apache.pulsar.client.impl.schema.SchemaUtils",
+ "jsonifySchemaInfo", SchemaInfo.class
+ ).invoke(null, schemaInfo));
+ }
+
+ /**
+ * Jsonify the key/value schema info.
+ *
+ * @param kvSchemaInfo the key/value schema info
+ * @return the jsonified schema info
+ */
+ public static String jsonifyKeyValueSchemaInfo(KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo) {
+ return catchExceptions(
+ () -> (String) getStaticMethod("org.apache.pulsar.client.impl.schema.SchemaUtils",
+ "jsonifyKeyValueSchemaInfo", KeyValue.class
+ ).invoke(null, kvSchemaInfo));
+ }
+
public static BatcherBuilder newDefaultBatcherBuilder() {
return catchExceptions(
() -> (BatcherBuilder) getConstructor("org.apache.pulsar.client.impl.DefaultBatcherBuilder")
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java
index 2b4764c..4d9db6e 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.internal;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import lombok.experimental.UtilityClass;
@@ -33,6 +34,15 @@ class ReflectionUtils {
try {
return s.get();
} catch (Throwable t) {
+ if (t instanceof InvocationTargetException) {
+ // exception is thrown during invocation
+ Throwable cause = t.getCause();
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ } else {
+ throw new RuntimeException(cause);
+ }
+ }
throw new RuntimeException(t);
}
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
index f95fe7d..3f66ff3 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/KeyValue.java
@@ -18,6 +18,10 @@
*/
package org.apache.pulsar.common.schema;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import org.apache.pulsar.client.api.Schema;
+
/**
* A simple KeyValue class
*/
@@ -37,4 +41,85 @@ public class KeyValue<K, V> {
public V getValue() {
return value;
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, value);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof KeyValue)) {
+ return false;
+ }
+ KeyValue<K, V> another = (KeyValue<K, V>) obj;
+ return Objects.equals(key, another.key)
+ && Objects.equals(value, another.value);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("(key = \"")
+ .append(key)
+ .append("\", value = \"")
+ .append(value)
+ .append("\")");
+ return sb.toString();
+ }
+
+ /**
+ * Decoder to decode key/value bytes.
+ */
+ @FunctionalInterface
+ public interface KeyValueDecoder<K, V> {
+
+ /**
+ * Decode key and value bytes into a {@link KeyValue} pair.
+ *
+ * @param keyData key data
+ * @param valueData value data
+ * @return the decoded {@link KeyValue} pair
+ */
+ KeyValue<K, V> decode(byte[] keyData, byte[] valueData);
+
+ }
+
+ /**
+ * Encode a <tt>key</tt> and <tt>value</tt> pair into a bytes array.
+ *
+ * @param key key object to encode
+ * @param keyWriter a writer to encode key object
+ * @param value value object to encode
+ * @param valueWriter a writer to encode value object
+ * @return the encoded bytes array
+ */
+ public static <K, V> byte[] encode(K key, Schema<K> keyWriter,
+ V value, Schema<V> valueWriter) {
+ byte [] keyBytes = keyWriter.encode(key);
+ byte [] valueBytes = valueWriter.encode(value);
+ ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length);
+ byteBuffer.putInt(keyBytes.length).put(keyBytes).putInt(valueBytes.length).put(valueBytes);
+ return byteBuffer.array();
+ }
+
+ /**
+ * Decode the value into a key/value pair.
+ *
+ * @param data the encoded bytes
+ * @param decoder the decoder to decode encoded key/value bytes
+ * @return the decoded key/value pair
+ */
+ public static <K, V> KeyValue<K, V> decode(byte[] data, KeyValueDecoder<K, V> decoder) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+ int keyLength = byteBuffer.getInt();
+ byte[] keyBytes = new byte[keyLength];
+ byteBuffer.get(keyBytes);
+
+ int valueLength = byteBuffer.getInt();
+ byte[] valueBytes = new byte[valueLength];
+ byteBuffer.get(valueBytes);
+
+ return decoder.decode(keyBytes, valueBytes);
+ }
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index c5d1b72..7b07632 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -30,6 +30,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
+import org.apache.pulsar.client.internal.DefaultImplementation;
@Data
@AllArgsConstructor
@@ -66,8 +67,18 @@ public class SchemaInfo {
case JSON:
case PROTOBUF:
return new String(schema, UTF_8);
+ case KEY_VALUE:
+ KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
+ DefaultImplementation.decodeKeyValueSchemaInfo(this);
+ return DefaultImplementation.jsonifyKeyValueSchemaInfo(schemaInfoKeyValue);
default:
return Base64.getEncoder().encodeToString(schema);
}
}
+
+ @Override
+ public String toString(){
+ return DefaultImplementation.jsonifySchemaInfo(this);
+ }
+
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
index 5d71035..9973df8 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSchemas.java
@@ -22,16 +22,8 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
-import java.lang.reflect.Type;
import java.net.URL;
import java.net.URLClassLoader;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonPrimitive;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
import org.apache.pulsar.admin.cli.utils.SchemaExtractor;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
@@ -66,21 +58,7 @@ public class CmdSchemas extends CmdBase {
} else {
schemaInfo = admin.schemas().getSchemaInfo(topic, version);
}
- Gson customGson = new GsonBuilder().registerTypeHierarchyAdapter(byte[].class,
- new ByteArrayToStringAdapter(schemaInfo)).create();
- System.out.println(customGson.toJson(schemaInfo));
- }
- }
-
- // Using Android's base64 libraries. This can be replaced with any base64 library.
- private class ByteArrayToStringAdapter implements JsonSerializer<byte[]> {
- private SchemaInfo schemaInfo;
- public ByteArrayToStringAdapter(SchemaInfo schemaInfo) {
- this.schemaInfo = schemaInfo;
- }
-
- public JsonElement serialize(byte[] src, Type typeOfSrc, JsonSerializationContext context) {
- return new JsonPrimitive(schemaInfo.getSchemaDefinition());
+ System.out.println(schemaInfo);
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
index 066d3b3..7eb4757 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
@@ -19,16 +19,8 @@
package org.apache.pulsar.client.impl.schema;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Map;
-
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
import lombok.Getter;
-
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
@@ -103,13 +95,17 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
KeyValueEncodingType keyValueEncodingType) {
this.keySchema = keySchema;
this.valueSchema = valueSchema;
+ this.keyValueEncodingType = keyValueEncodingType;
+ this.schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo(
+ keySchema, valueSchema, keyValueEncodingType
+ );
if (keySchema instanceof StructSchema) {
- ((StructSchema) keySchema).setSchemaInfoProvider(new SchemaInfoProvider() {
+ keySchema.setSchemaInfoProvider(new SchemaInfoProvider() {
@Override
public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
- return decodeKeyValueSchemaInfo(versionSchemaInfo).getKey();
+ return KeyValueSchemaInfo.decodeKeyValueSchemaInfo(versionSchemaInfo).getKey();
}
@Override
@@ -125,11 +121,11 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
}
if (valueSchema instanceof StructSchema) {
- ((StructSchema) valueSchema).setSchemaInfoProvider(new SchemaInfoProvider() {
+ valueSchema.setSchemaInfoProvider(new SchemaInfoProvider() {
@Override
public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
SchemaInfo versionSchemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
- return decodeKeyValueSchemaInfo(versionSchemaInfo).getValue();
+ return KeyValueSchemaInfo.decodeKeyValueSchemaInfo(versionSchemaInfo).getValue();
}
@Override
@@ -144,35 +140,6 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
});
}
- // set schemaInfo
- this.schemaInfo = new SchemaInfo()
- .setName("KeyValue")
- .setType(SchemaType.KEY_VALUE);
-
- byte[] keySchemaInfo = keySchema.getSchemaInfo().getSchema();
- byte[] valueSchemaInfo = valueSchema.getSchemaInfo().getSchema();
-
- ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keySchemaInfo.length + 4 + valueSchemaInfo.length);
- byteBuffer.putInt(keySchemaInfo.length).put(keySchemaInfo)
- .putInt(valueSchemaInfo.length).put(valueSchemaInfo);
-
- Map<String, String> properties = Maps.newHashMap();
-
- properties.put("key.schema.name", keySchema.getSchemaInfo().getName());
- properties.put("key.schema.type", String.valueOf(keySchema.getSchemaInfo().getType()));
- Gson keySchemaGson = new Gson();
- properties.put("key.schema.properties", keySchemaGson.toJson(keySchema.getSchemaInfo().getProperties()));
- properties.put("value.schema.name", valueSchema.getSchemaInfo().getName());
- properties.put("value.schema.type", String.valueOf(valueSchema.getSchemaInfo().getType()));
- Gson valueSchemaGson = new Gson();
- properties.put("value.schema.properties", valueSchemaGson.toJson(valueSchema.getSchemaInfo().getProperties()));
-
- checkNotNull(keyValueEncodingType, "Null encoding type is provided");
- this.keyValueEncodingType = keyValueEncodingType;
- properties.put("kv.encoding.type", String.valueOf(keyValueEncodingType));
-
- this.schemaInfo.setSchema(byteBuffer.array()).setProperties(properties);
-
this.schemaInfoProvider = new SchemaInfoProvider() {
@Override
public SchemaInfo getSchemaByVersion(byte[] schemaVersion) {
@@ -194,11 +161,12 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
// encode as bytes: [key.length][key.bytes][value.length][value.bytes] or [value.bytes]
public byte[] encode(KeyValue<K, V> message) {
if (keyValueEncodingType != null && keyValueEncodingType == KeyValueEncodingType.INLINE) {
- byte [] keyBytes = keySchema.encode(message.getKey());
- byte [] valueBytes = valueSchema.encode(message.getValue());
- ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length);
- byteBuffer.putInt(keyBytes.length).put(keyBytes).putInt(valueBytes.length).put(valueBytes);
- return byteBuffer.array();
+ return KeyValue.encode(
+ message.getKey(),
+ keySchema,
+ message.getValue(),
+ valueSchema
+ );
} else {
return valueSchema.encode(message.getValue());
}
@@ -213,16 +181,8 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
if (this.keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
throw new SchemaSerializationException("This method cannot be used under this SEPARATED encoding type");
}
- ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
- int keyLength = byteBuffer.getInt();
- byte[] keyBytes = new byte[keyLength];
- byteBuffer.get(keyBytes);
- int valueLength = byteBuffer.getInt();
- byte[] valueBytes = new byte[valueLength];
- byteBuffer.get(valueBytes);
-
- return decode(keyBytes, valueBytes, schemaVersion);
+ return KeyValue.decode(bytes, (keyBytes, valueBytes) -> decode(keyBytes, valueBytes, schemaVersion));
}
public KeyValue<K, V> decode(byte[] keyBytes, byte[] valueBytes, byte[] schemaVersion) {
@@ -249,36 +209,4 @@ public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
this.schemaInfoProvider = schemaInfoProvider;
}
- private static KeyValue<SchemaInfo, SchemaInfo> decodeKeyValueSchemaInfo(SchemaInfo schemaInfo) {
- ByteBuffer byteBuffer = ByteBuffer.wrap(schemaInfo.getSchema());
- int keySchemaLength = byteBuffer.getInt();
- byte[] key = new byte[keySchemaLength];
- byteBuffer.get(key);
- int valueSchemaLength = byteBuffer.getInt();
- byte[] value = new byte[valueSchemaLength];
- byteBuffer.get(value);
- Gson keySchemaGson = new Gson();
- Map<String, String> keyProperties = Maps.newHashMap();
- if (schemaInfo.getProperties().get("key.schema.properties") != null) {
- keyProperties = keySchemaGson.fromJson(schemaInfo.getProperties().get("key.schema.properties"), Map.class);
- } else {
- keyProperties = Collections.emptyMap();
- }
- SchemaInfo keySchemaInfo = SchemaInfo.builder().schema(key)
- .properties(keyProperties)
- .name("")
- .type(SchemaType.AVRO).build();
- Gson valueSchemaGson = new Gson();
- Map<String, String> valueProperties = Maps.newHashMap();
- if (schemaInfo.getProperties().get("value.schema.properties") != null) {
- valueProperties = valueSchemaGson.fromJson(schemaInfo.getProperties().get("value.schema.properties"), Map.class);
- } else {
- valueProperties = Collections.emptyMap();
- }
- SchemaInfo valueSchemaInfo = SchemaInfo.builder().schema(value)
- .properties(valueProperties)
- .name("")
- .type(SchemaType.AVRO).build();
- return new KeyValue<>(keySchemaInfo, valueSchemaInfo);
- }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
new file mode 100644
index 0000000..6d54d57
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+/**
+ * Util class for processing key/value schema info.
+ */
+@Slf4j
+public final class KeyValueSchemaInfo {
+
+ private static final Schema<SchemaInfo> SCHEMA_INFO_WRITER = new Schema<SchemaInfo>() {
+ @Override
+ public byte[] encode(SchemaInfo si) {
+ return si.getSchema();
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return BytesSchema.BYTES.getSchemaInfo();
+ }
+ };
+
+ private static final String KEY_SCHEMA_NAME = "key.schema.name";
+ private static final String KEY_SCHEMA_TYPE = "key.schema.type";
+ private static final String KEY_SCHEMA_PROPS = "key.schema.properties";
+ private static final String VALUE_SCHEMA_NAME = "value.schema.name";
+ private static final String VALUE_SCHEMA_TYPE = "value.schema.type";
+ private static final String VALUE_SCHEMA_PROPS = "value.schema.properties";
+ private static final String KV_ENCODING_TYPE = "kv.encoding.type";
+
+ /**
+ * Decode the kv encoding type from the schema info.
+ *
+ * @param schemaInfo the schema info
+ * @return the kv encoding type
+ */
+ public static KeyValueEncodingType decodeKeyValueEncodingType(SchemaInfo schemaInfo) {
+ checkArgument(SchemaType.KEY_VALUE == schemaInfo.getType(),
+ "Not a KeyValue schema");
+
+ String encodingTypeStr = schemaInfo.getProperties().get(KV_ENCODING_TYPE);
+ if (StringUtils.isEmpty(encodingTypeStr)) {
+ return KeyValueEncodingType.INLINE;
+ } else {
+ return KeyValueEncodingType.valueOf(encodingTypeStr);
+ }
+ }
+
+ /**
+ * Encode key & value into schema into a KeyValue schema.
+ *
+ * @param keySchema the key schema
+ * @param valueSchema the value schema
+ * @param keyValueEncodingType the encoding type to encode and decode key value pair
+ * @return the final schema info
+ */
+ public static <K, V> SchemaInfo encodeKeyValueSchemaInfo(Schema<K> keySchema,
+ Schema<V> valueSchema,
+ KeyValueEncodingType keyValueEncodingType) {
+ return encodeKeyValueSchemaInfo(
+ "KeyValue",
+ keySchema,
+ valueSchema,
+ keyValueEncodingType
+ );
+ }
+
+ /**
+ * Encode key & value into schema into a KeyValue schema.
+ *
+ * @param schemaName the final schema name
+ * @param keySchema the key schema
+ * @param valueSchema the value schema
+ * @param keyValueEncodingType the encoding type to encode and decode key value pair
+ * @return the final schema info
+ */
+ public static <K, V> SchemaInfo encodeKeyValueSchemaInfo(String schemaName,
+ Schema<K> keySchema,
+ Schema<V> valueSchema,
+ KeyValueEncodingType keyValueEncodingType) {
+ checkNotNull(keyValueEncodingType, "Null encoding type is provided");
+
+ // process key/value schema data
+ byte[] schemaData = KeyValue.encode(
+ keySchema.getSchemaInfo(),
+ SCHEMA_INFO_WRITER,
+ valueSchema.getSchemaInfo(),
+ SCHEMA_INFO_WRITER
+ );
+
+ // process key/value schema properties
+ Map<String, String> properties = new HashMap<>();
+ encodeSubSchemaInfoToParentSchemaProperties(
+ keySchema,
+ KEY_SCHEMA_NAME,
+ KEY_SCHEMA_TYPE,
+ KEY_SCHEMA_PROPS,
+ properties
+ );
+
+ encodeSubSchemaInfoToParentSchemaProperties(
+ valueSchema,
+ VALUE_SCHEMA_NAME,
+ VALUE_SCHEMA_TYPE,
+ VALUE_SCHEMA_PROPS,
+ properties
+ );
+ properties.put(KV_ENCODING_TYPE, String.valueOf(keyValueEncodingType));
+
+ // generate the final schema info
+ return new SchemaInfo()
+ .setName(schemaName)
+ .setType(SchemaType.KEY_VALUE)
+ .setSchema(schemaData)
+ .setProperties(properties);
+
+ }
+
+ private static void encodeSubSchemaInfoToParentSchemaProperties(Schema<?> schema,
+ String schemaNameProperty,
+ String schemaTypeProperty,
+ String schemaPropsProperty,
+ Map<String, String> parentSchemaProperties) {
+ SchemaInfo schemaInfo = schema.getSchemaInfo();
+
+ parentSchemaProperties.put(schemaNameProperty, schemaInfo.getName());
+ parentSchemaProperties.put(schemaTypeProperty, String.valueOf(schemaInfo.getType()));
+ parentSchemaProperties.put(
+ schemaPropsProperty,
+ SchemaUtils.serializeSchemaProperties(schemaInfo.getProperties()));
+ }
+
+ /**
+ * Decode the key/value schema info to get key schema info and value schema info.
+ *
+ * @param schemaInfo key/value schema info.
+ * @return the pair of key schema info and value schema info
+ */
+ public static KeyValue<SchemaInfo, SchemaInfo> decodeKeyValueSchemaInfo(SchemaInfo schemaInfo) {
+ checkArgument(SchemaType.KEY_VALUE == schemaInfo.getType(),
+ "Not a KeyValue schema");
+
+ return KeyValue.decode(
+ schemaInfo.getSchema(),
+ (keyBytes, valueBytes) -> {
+ SchemaInfo keySchemaInfo = decodeSubSchemaInfo(
+ schemaInfo,
+ KEY_SCHEMA_NAME,
+ KEY_SCHEMA_TYPE,
+ KEY_SCHEMA_PROPS,
+ keyBytes
+ );
+
+ SchemaInfo valueSchemaInfo = decodeSubSchemaInfo(
+ schemaInfo,
+ VALUE_SCHEMA_NAME,
+ VALUE_SCHEMA_TYPE,
+ VALUE_SCHEMA_PROPS,
+ valueBytes
+ );
+ return new KeyValue<>(keySchemaInfo, valueSchemaInfo);
+ }
+ );
+ }
+
+ private static SchemaInfo decodeSubSchemaInfo(SchemaInfo parentSchemaInfo,
+ String schemaNameProperty,
+ String schemaTypeProperty,
+ String schemaPropsProperty,
+ byte[] schemaData) {
+ Map<String, String> parentSchemaProps = parentSchemaInfo.getProperties();
+ String schemaName = parentSchemaProps.getOrDefault(schemaNameProperty, "");
+ SchemaType schemaType =
+ SchemaType.valueOf(parentSchemaProps.getOrDefault(schemaTypeProperty, SchemaType.BYTES.name()));
+ Map<String, String> schemaProps;
+ String schemaPropsStr = parentSchemaProps.get(schemaPropsProperty);
+ if (StringUtils.isEmpty(schemaPropsStr)) {
+ schemaProps = Collections.emptyMap();
+ } else {
+ schemaProps = SchemaUtils.deserializeSchemaProperties(schemaPropsStr);
+ }
+ return SchemaInfo.builder()
+ .name(schemaName)
+ .type(schemaType)
+ .schema(schemaData)
+ .properties(schemaProps)
+ .build();
+ }
+
+ private KeyValueSchemaInfo() {}
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
index 3ba36b1..83b23c9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
@@ -18,14 +18,30 @@
*/
package org.apache.pulsar.client.impl.schema;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
+import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.pulsar.client.internal.DefaultImplementation;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
/**
@@ -168,4 +184,145 @@ public final class SchemaUtils {
}
+ /**
+ * Jsonify the schema info.
+ *
+ * @param schemaInfo the schema info
+ * @return the jsonified schema info
+ */
+ public static String jsonifySchemaInfo(SchemaInfo schemaInfo) {
+ GsonBuilder gsonBuilder = new GsonBuilder()
+ .setPrettyPrinting()
+ .registerTypeHierarchyAdapter(byte[].class, new ByteArrayToStringAdapter(schemaInfo))
+ .registerTypeHierarchyAdapter(Map.class, SCHEMA_PROPERTIES_SERIALIZER);
+
+ return gsonBuilder.create().toJson(schemaInfo);
+ }
+
+ private static class SchemaPropertiesSerializer implements JsonSerializer<Map<String, String>> {
+
+ @Override
+ public JsonElement serialize(Map<String, String> properties,
+ Type type,
+ JsonSerializationContext jsonSerializationContext) {
+ SortedMap<String, String> sortedProperties = new TreeMap<>();
+ sortedProperties.putAll(properties);
+ JsonObject object = new JsonObject();
+ sortedProperties.forEach((key, value) -> {
+ object.add(key, new JsonPrimitive(value));
+ });
+ return object;
+ }
+
+ }
+
+ private static class SchemaPropertiesDeserializer implements JsonDeserializer<Map<String, String>> {
+
+ @Override
+ public Map<String, String> deserialize(JsonElement jsonElement,
+ Type type,
+ JsonDeserializationContext jsonDeserializationContext)
+ throws JsonParseException {
+
+ SortedMap<String, String> sortedProperties = new TreeMap<>();
+ jsonElement.getAsJsonObject().entrySet().forEach(entry -> sortedProperties.put(
+ entry.getKey(),
+ entry.getValue().getAsString()
+ ));
+ return sortedProperties;
+ }
+
+ }
+
+ private static final SchemaPropertiesSerializer SCHEMA_PROPERTIES_SERIALIZER =
+ new SchemaPropertiesSerializer();
+
+ private static final SchemaPropertiesDeserializer SCHEMA_PROPERTIES_DESERIALIZER =
+ new SchemaPropertiesDeserializer();
+
+ private static class ByteArrayToStringAdapter implements JsonSerializer<byte[]> {
+
+ private final SchemaInfo schemaInfo;
+
+ public ByteArrayToStringAdapter(SchemaInfo schemaInfo) {
+ this.schemaInfo = schemaInfo;
+ }
+
+ public JsonElement serialize(byte[] src, Type typeOfSrc, JsonSerializationContext context) {
+ String schemaDef = schemaInfo.getSchemaDefinition();
+ SchemaType type = schemaInfo.getType();
+ switch (type) {
+ case AVRO:
+ case JSON:
+ case PROTOBUF:
+ return toJsonObject(schemaInfo.getSchemaDefinition());
+ case KEY_VALUE:
+ KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
+ DefaultImplementation.decodeKeyValueSchemaInfo(schemaInfo);
+ JsonObject obj = new JsonObject();
+ String keyJson = jsonifySchemaInfo(schemaInfoKeyValue.getKey());
+ String valueJson = jsonifySchemaInfo(schemaInfoKeyValue.getValue());
+ obj.add("key", toJsonObject(keyJson));
+ obj.add("value", toJsonObject(valueJson));
+ return obj;
+ default:
+ return new JsonPrimitive(schemaDef);
+ }
+ }
+ }
+
+ private static JsonObject toJsonObject(String json) {
+ JsonParser parser = new JsonParser();
+ return parser.parse(json).getAsJsonObject();
+ }
+
+ private static class SchemaInfoToStringAdapter implements JsonSerializer<SchemaInfo> {
+
+ @Override
+ public JsonElement serialize(SchemaInfo schemaInfo,
+ Type type,
+ JsonSerializationContext jsonSerializationContext) {
+ return toJsonObject(jsonifySchemaInfo(schemaInfo));
+ }
+ }
+
+ private static final SchemaInfoToStringAdapter SCHEMAINFO_ADAPTER = new SchemaInfoToStringAdapter();
+
+ /**
+ * Jsonify the key/value schema info.
+ *
+ * @param kvSchemaInfo the key/value schema info
+ * @return the jsonified schema info
+ */
+ public static String jsonifyKeyValueSchemaInfo(KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo) {
+ GsonBuilder gsonBuilder = new GsonBuilder()
+ .registerTypeHierarchyAdapter(SchemaInfo.class, SCHEMAINFO_ADAPTER)
+ .registerTypeHierarchyAdapter(Map.class, SCHEMA_PROPERTIES_SERIALIZER);
+ return gsonBuilder.create().toJson(kvSchemaInfo);
+ }
+
+ /**
+ * Serialize schema properties
+ *
+ * @param properties schema properties
+ * @return the serialized schema properties
+ */
+ public static String serializeSchemaProperties(Map<String, String> properties) {
+ GsonBuilder gsonBuilder = new GsonBuilder()
+ .registerTypeHierarchyAdapter(Map.class, SCHEMA_PROPERTIES_SERIALIZER);
+ return gsonBuilder.create().toJson(properties);
+ }
+
+ /**
+ * Deserialize schema properties from a serialized schema properties.
+ *
+ * @param serializedProperties serialized properties
+ * @return the deserialized properties
+ */
+ public static Map<String, String> deserializeSchemaProperties(String serializedProperties) {
+ GsonBuilder gsonBuilder = new GsonBuilder()
+ .registerTypeHierarchyAdapter(Map.class, SCHEMA_PROPERTIES_DESERIALIZER);
+ return gsonBuilder.create().fromJson(serializedProperties, Map.class);
+ }
+
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
new file mode 100644
index 0000000..a2e0f46
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
+import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
+import org.apache.pulsar.client.internal.DefaultImplementation;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link KeyValueSchemaInfoTest}.
+ */
+@Slf4j
+public class KeyValueSchemaInfoTest {
+
+ private static final Map<String, String> FOO_PROPERTIES = new HashMap() {
+
+ private static final long serialVersionUID = 58641844834472929L;
+
+ {
+ put("foo1", "foo-value1");
+ put("foo2", "foo-value2");
+ put("foo3", "foo-value3");
+ }
+
+ };
+
+ private static final Map<String, String> BAR_PROPERTIES = new HashMap() {
+
+ private static final long serialVersionUID = 58641844834472929L;
+
+ {
+ put("bar1", "bar-value1");
+ put("bar2", "bar-value2");
+ put("bar3", "bar-value3");
+ }
+
+ };
+
+ public static final Schema<Foo> FOO_SCHEMA =
+ Schema.AVRO(SchemaDefinition.<Foo>builder()
+ .withAlwaysAllowNull(false)
+ .withPojo(Foo.class)
+ .withProperties(FOO_PROPERTIES)
+ .build()
+ );
+ public static final Schema<Bar> BAR_SCHEMA =
+ Schema.JSON(SchemaDefinition.<Bar>builder()
+ .withAlwaysAllowNull(true)
+ .withPojo(Bar.class)
+ .withProperties(BAR_PROPERTIES)
+ .build());
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testDecodeNonKeyValueSchemaInfo() {
+ DefaultImplementation.decodeKeyValueSchemaInfo(
+ FOO_SCHEMA.getSchemaInfo()
+ );
+ }
+
+ @DataProvider(name = "encodingTypes")
+ public Object[][] encodingTypes() {
+ return new Object[][] {
+ { KeyValueEncodingType.INLINE },
+ { KeyValueEncodingType.SEPARATED },
+ };
+ }
+
+ @Test(dataProvider = "encodingTypes")
+ public void encodeDecodeKeyValueSchemaInfo(KeyValueEncodingType encodingType) {
+ Schema<KeyValue<Foo, Bar>> kvSchema = Schema.KeyValue(
+ FOO_SCHEMA,
+ BAR_SCHEMA,
+ encodingType
+ );
+ SchemaInfo kvSchemaInfo = kvSchema.getSchemaInfo();
+ assertEquals(
+ DefaultImplementation.decodeKeyValueEncodingType(kvSchemaInfo),
+ encodingType);
+
+ SchemaInfo encodedSchemaInfo =
+ DefaultImplementation.encodeKeyValueSchemaInfo(FOO_SCHEMA, BAR_SCHEMA, encodingType);
+ assertEquals(encodedSchemaInfo, kvSchemaInfo);
+ assertEquals(
+ DefaultImplementation.decodeKeyValueEncodingType(encodedSchemaInfo),
+ encodingType);
+
+ KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
+ DefaultImplementation.decodeKeyValueSchemaInfo(kvSchemaInfo);
+
+ assertEquals(schemaInfoKeyValue.getKey(), FOO_SCHEMA.getSchemaInfo());
+ assertEquals(schemaInfoKeyValue.getValue(), BAR_SCHEMA.getSchemaInfo());
+ }
+
+ @Test(dataProvider = "encodingTypes")
+ public void encodeDecodeNestedKeyValueSchemaInfo(KeyValueEncodingType encodingType) {
+ Schema<KeyValue<String, Bar>> nestedSchema =
+ Schema.KeyValue(Schema.STRING, BAR_SCHEMA, KeyValueEncodingType.INLINE);
+ Schema<KeyValue<Foo, KeyValue<String, Bar>>> kvSchema = Schema.KeyValue(
+ FOO_SCHEMA,
+ nestedSchema,
+ encodingType
+ );
+ SchemaInfo kvSchemaInfo = kvSchema.getSchemaInfo();
+ assertEquals(
+ DefaultImplementation.decodeKeyValueEncodingType(kvSchemaInfo),
+ encodingType);
+
+ SchemaInfo encodedSchemaInfo =
+ DefaultImplementation.encodeKeyValueSchemaInfo(
+ FOO_SCHEMA,
+ nestedSchema,
+ encodingType);
+ assertEquals(encodedSchemaInfo, kvSchemaInfo);
+ assertEquals(
+ DefaultImplementation.decodeKeyValueEncodingType(encodedSchemaInfo),
+ encodingType);
+
+ KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
+ DefaultImplementation.decodeKeyValueSchemaInfo(kvSchemaInfo);
+
+ assertEquals(schemaInfoKeyValue.getKey(), FOO_SCHEMA.getSchemaInfo());
+ assertEquals(schemaInfoKeyValue.getValue().getType(), SchemaType.KEY_VALUE);
+ KeyValue<SchemaInfo, SchemaInfo> nestedSchemaInfoKeyValue =
+ DefaultImplementation.decodeKeyValueSchemaInfo(schemaInfoKeyValue.getValue());
+
+ assertEquals(nestedSchemaInfoKeyValue.getKey(), Schema.STRING.getSchemaInfo());
+ assertEquals(nestedSchemaInfoKeyValue.getValue(), BAR_SCHEMA.getSchemaInfo());
+ }
+
+ @Test
+ public void testKeyValueSchemaInfoBackwardCompatibility() {
+ Schema<KeyValue<Foo, Bar>> kvSchema = Schema.KeyValue(
+ FOO_SCHEMA,
+ BAR_SCHEMA,
+ KeyValueEncodingType.SEPARATED
+ );
+
+ SchemaInfo oldSchemaInfo = new SchemaInfo()
+ .setName("")
+ .setType(SchemaType.KEY_VALUE)
+ .setSchema(kvSchema.getSchemaInfo().getSchema())
+ .setProperties(Collections.emptyMap());
+
+ assertEquals(
+ DefaultImplementation.decodeKeyValueEncodingType(oldSchemaInfo),
+ KeyValueEncodingType.INLINE);
+
+ KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
+ DefaultImplementation.decodeKeyValueSchemaInfo(oldSchemaInfo);
+ // verify the key schema
+ SchemaInfo keySchemaInfo = schemaInfoKeyValue.getKey();
+ assertEquals(
+ SchemaType.BYTES, keySchemaInfo.getType()
+ );
+ assertArrayEquals(
+ "Expected schema = " + FOO_SCHEMA.getSchemaInfo().getSchemaDefinition()
+ + " but found " + keySchemaInfo.getSchemaDefinition(),
+ FOO_SCHEMA.getSchemaInfo().getSchema(),
+ keySchemaInfo.getSchema()
+ );
+ assertFalse(FOO_SCHEMA.getSchemaInfo().getProperties().isEmpty());
+ assertTrue(keySchemaInfo.getProperties().isEmpty());
+ // verify the value schema
+ SchemaInfo valueSchemaInfo = schemaInfoKeyValue.getValue();
+ assertEquals(
+ SchemaType.BYTES, valueSchemaInfo.getType()
+ );
+ assertArrayEquals(
+ BAR_SCHEMA.getSchemaInfo().getSchema(),
+ valueSchemaInfo.getSchema()
+ );
+ assertFalse(BAR_SCHEMA.getSchemaInfo().getProperties().isEmpty());
+ assertTrue(valueSchemaInfo.getProperties().isEmpty());
+ }
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
new file mode 100644
index 0000000..bacf22b
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import static org.testng.Assert.assertEquals;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link org.apache.pulsar.common.schema.SchemaInfo}.
+ */
+public class SchemaInfoTest {
+
+ private static final String INT32_SCHEMA_INFO = "{\n"
+ + " \"name\": \"INT32\",\n"
+ + " \"schema\": \"\",\n"
+ + " \"type\": \"INT32\",\n"
+ + " \"properties\": {}\n"
+ + "}";
+
+ private static final String UTF8_SCHEMA_INFO = "{\n"
+ + " \"name\": \"String\",\n"
+ + " \"schema\": \"\",\n"
+ + " \"type\": \"STRING\",\n"
+ + " \"properties\": {}\n"
+ + "}";
+
+ private static final String BAR_SCHEMA_INFO = "{\n"
+ + " \"name\": \"\",\n"
+ + " \"schema\": {\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"Bar\",\n"
+ + " \"namespace\": \"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"field1\",\n"
+ + " \"type\": \"boolean\"\n"
+ + " }\n"
+ + " ]\n"
+ + " },\n"
+ + " \"type\": \"JSON\",\n"
+ + " \"properties\": {\n"
+ + " \"__alwaysAllowNull\": \"true\",\n"
+ + " \"bar1\": \"bar-value1\",\n"
+ + " \"bar2\": \"bar-value2\",\n"
+ + " \"bar3\": \"bar-value3\"\n"
+ + " }\n"
+ + "}";
+
+ private static final String FOO_SCHEMA_INFO = "{\n"
+ + " \"name\": \"\",\n"
+ + " \"schema\": {\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"Foo\",\n"
+ + " \"namespace\": \"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"field1\",\n"
+ + " \"type\": [\n"
+ + " \"null\",\n"
+ + " \"string\"\n"
+ + " ]\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"field2\",\n"
+ + " \"type\": [\n"
+ + " \"null\",\n"
+ + " \"string\"\n"
+ + " ]\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"field3\",\n"
+ + " \"type\": \"int\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"field4\",\n"
+ + " \"type\": [\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"Bar\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"field1\",\n"
+ + " \"type\": \"boolean\"\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"color\",\n"
+ + " \"type\": [\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\": \"enum\",\n"
+ + " \"name\": \"Color\",\n"
+ + " \"symbols\": [\n"
+ + " \"RED\",\n"
+ + " \"BLUE\"\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"fieldUnableNull\",\n"
+ + " \"type\": \"string\",\n"
+ + " \"default\": \"defaultValue\"\n"
+ + " }\n"
+ + " ]\n"
+ + " },\n"
+ + " \"type\": \"AVRO\",\n"
+ + " \"properties\": {\n"
+ + " \"__alwaysAllowNull\": \"false\",\n"
+ + " \"foo1\": \"foo-value1\",\n"
+ + " \"foo2\": \"foo-value2\",\n"
+ + " \"foo3\": \"foo-value3\"\n"
+ + " }\n"
+ + "}";
+
+ private static final String KV_SCHEMA_INFO = "{\n"
+ + " \"name\": \"KeyValue\",\n"
+ + " \"schema\": {\n"
+ + " \"key\": {\n"
+ + " \"name\": \"\",\n"
+ + " \"schema\": {\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"Foo\",\n"
+ + " \"namespace\": \"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"field1\",\n"
+ + " \"type\": [\n"
+ + " \"null\",\n"
+ + " \"string\"\n"
+ + " ]\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"field2\",\n"
+ + " \"type\": [\n"
+ + " \"null\",\n"
+ + " \"string\"\n"
+ + " ]\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"field3\",\n"
+ + " \"type\": \"int\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"field4\",\n"
+ + " \"type\": [\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"Bar\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"field1\",\n"
+ + " \"type\": \"boolean\"\n"
+ + " }\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"color\",\n"
+ + " \"type\": [\n"
+ + " \"null\",\n"
+ + " {\n"
+ + " \"type\": \"enum\",\n"
+ + " \"name\": \"Color\",\n"
+ + " \"symbols\": [\n"
+ + " \"RED\",\n"
+ + " \"BLUE\"\n"
+ + " ]\n"
+ + " }\n"
+ + " ]\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"fieldUnableNull\",\n"
+ + " \"type\": \"string\",\n"
+ + " \"default\": \"defaultValue\"\n"
+ + " }\n"
+ + " ]\n"
+ + " },\n"
+ + " \"type\": \"AVRO\",\n"
+ + " \"properties\": {\n"
+ + " \"__alwaysAllowNull\": \"false\",\n"
+ + " \"foo1\": \"foo-value1\",\n"
+ + " \"foo2\": \"foo-value2\",\n"
+ + " \"foo3\": \"foo-value3\"\n"
+ + " }\n"
+ + " },\n"
+ + " \"value\": {\n"
+ + " \"name\": \"\",\n"
+ + " \"schema\": {\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"Bar\",\n"
+ + " \"namespace\": \"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\n"
+ + " \"fields\": [\n"
+ + " {\n"
+ + " \"name\": \"field1\",\n"
+ + " \"type\": \"boolean\"\n"
+ + " }\n"
+ + " ]\n"
+ + " },\n"
+ + " \"type\": \"JSON\",\n"
+ + " \"properties\": {\n"
+ + " \"__alwaysAllowNull\": \"true\",\n"
+ + " \"bar1\": \"bar-value1\",\n"
+ + " \"bar2\": \"bar-value2\",\n"
+ + " \"bar3\": \"bar-value3\"\n"
+ + " }\n"
+ + " }\n"
+ + " },\n"
+ + " \"type\": \"KEY_VALUE\",\n"
+ + " \"properties\": {\n"
+ + " \"key.schema.name\": \"\",\n"
+ + " \"key.schema.properties\": \"{\\\"__alwaysAllowNull\\\":\\\"false\\\",\\\"foo1\\\":\\\"foo-value1\\\",\\\"foo2\\\":\\\"foo-value2\\\",\\\"foo3\\\":\\\"foo-value3\\\"}\",\n"
+ + " \"key.schema.type\": \"AVRO\",\n"
+ + " \"kv.encoding.type\": \"SEPARATED\",\n"
+ + " \"value.schema.name\": \"\",\n"
+ + " \"value.schema.properties\": \"{\\\"__alwaysAllowNull\\\":\\\"true\\\",\\\"bar1\\\":\\\"bar-value1\\\",\\\"bar2\\\":\\\"bar-value2\\\",\\\"bar3\\\":\\\"bar-value3\\\"}\",\n"
+ + " \"value.schema.type\": \"JSON\"\n"
+ + " }\n"
+ + "}";
+
+ @DataProvider(name = "schemas")
+ public Object[][] schemas() {
+ return new Object[][] {
+ {
+ Schema.STRING.getSchemaInfo(), UTF8_SCHEMA_INFO
+ },
+ {
+ Schema.INT32.getSchemaInfo(), INT32_SCHEMA_INFO
+ },
+ {
+ KeyValueSchemaInfoTest.FOO_SCHEMA.getSchemaInfo(), FOO_SCHEMA_INFO
+ },
+ {
+ KeyValueSchemaInfoTest.BAR_SCHEMA.getSchemaInfo(), BAR_SCHEMA_INFO
+ },
+ {
+ Schema.KeyValue(
+ KeyValueSchemaInfoTest.FOO_SCHEMA,
+ KeyValueSchemaInfoTest.BAR_SCHEMA,
+ KeyValueEncodingType.SEPARATED
+ ).getSchemaInfo(),
+ KV_SCHEMA_INFO
+ }
+ };
+ }
+
+ @Test(dataProvider = "schemas")
+ public void testSchemaInfoToString(SchemaInfo si, String jsonifiedStr) {
+ assertEquals(si.toString(), jsonifiedStr);
+ }
+
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java b/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
new file mode 100644
index 0000000..7f6f425
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/common/schema/KeyValueTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.testng.Assert.assertEquals;
+
+import io.netty.buffer.Unpooled;
+import java.nio.ByteBuffer;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.BooleanSchema;
+import org.apache.pulsar.client.impl.schema.ByteBufSchema;
+import org.apache.pulsar.client.impl.schema.ByteBufferSchema;
+import org.apache.pulsar.client.impl.schema.ByteSchema;
+import org.apache.pulsar.client.impl.schema.BytesSchema;
+import org.apache.pulsar.client.impl.schema.DateSchema;
+import org.apache.pulsar.client.impl.schema.DoubleSchema;
+import org.apache.pulsar.client.impl.schema.FloatSchema;
+import org.apache.pulsar.client.impl.schema.IntSchema;
+import org.apache.pulsar.client.impl.schema.LongSchema;
+import org.apache.pulsar.client.impl.schema.ShortSchema;
+import org.apache.pulsar.client.impl.schema.StringSchema;
+import org.apache.pulsar.client.impl.schema.TimeSchema;
+import org.apache.pulsar.client.impl.schema.TimestampSchema;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test
+public class KeyValueTest {
+
+ private static final Map<Schema, List<Object>> testData = new HashMap() {
+
+ private static final long serialVersionUID = -3081991052949960650L;
+
+ {
+ put(BooleanSchema.of(), Arrays.asList(false, true));
+ put(StringSchema.utf8(), Arrays.asList("my string"));
+ put(ByteSchema.of(), Arrays.asList((byte) 32767, (byte) -32768));
+ put(ShortSchema.of(), Arrays.asList((short) 32767, (short) -32768));
+ put(IntSchema.of(), Arrays.asList((int) 423412424, (int) -41243432));
+ put(LongSchema.of(), Arrays.asList(922337203685477580L, -922337203685477581L));
+ put(FloatSchema.of(), Arrays.asList(5678567.12312f, -5678567.12341f));
+ put(DoubleSchema.of(), Arrays.asList(5678567.12312d, -5678567.12341d));
+ put(BytesSchema.of(), Arrays.asList("my string".getBytes(UTF_8)));
+ put(ByteBufferSchema.of(), Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes(UTF_8))));
+ put(ByteBufSchema.of(), Arrays.asList(Unpooled.wrappedBuffer("my string".getBytes(UTF_8))));
+ put(DateSchema.of(), Arrays.asList(new Date(new java.util.Date().getTime() - 10000), new Date(new java.util.Date().getTime())));
+ put(TimeSchema.of(), Arrays.asList(new Time(new java.util.Date().getTime() - 10000), new Time(new java.util.Date().getTime())));
+ put(TimestampSchema.of(), Arrays.asList(new Timestamp(new java.util.Date().getTime()), new Timestamp(new java.util.Date().getTime())));
+ }
+ };
+
+ @DataProvider(name = "schemas")
+ public Object[][] schemas() {
+ return new Object[][] {
+ { testData }
+ };
+ }
+
+ @Test(dataProvider = "schemas")
+ public void testAllSchemas(Map<Schema, List<Object>> schemas) {
+ for (Map.Entry<Schema, List<Object>> keyEntry : schemas.entrySet()) {
+ for (Map.Entry<Schema, List<Object>> valueEntry : schemas.entrySet()) {
+ testEncodeDecodeKeyValue(
+ keyEntry.getKey(),
+ valueEntry.getKey(),
+ keyEntry.getValue(),
+ valueEntry.getValue()
+ );
+ }
+ }
+ }
+
+ private <K, V> void testEncodeDecodeKeyValue(Schema<K> keySchema,
+ Schema<V> valueSchema,
+ List<K> keys,
+ List<V> values) {
+ for (K key : keys) {
+ for (V value : values) {
+ byte[] data = KeyValue.encode(
+ key, keySchema,
+ value, valueSchema
+ );
+
+ KeyValue<K, V> kv = KeyValue.decode(
+ data,
+ (keyBytes, valueBytes) -> new KeyValue<>(
+ keySchema.decode(keyBytes),
+ valueSchema.decode(valueBytes)
+ )
+ );
+
+ assertEquals(kv.getKey(), key);
+ assertEquals(kv.getValue(), value);
+ }
+ }
+ }
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
index e68603e..d534811 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
@@ -23,6 +23,7 @@ import java.util.Map;
import lombok.Builder;
import lombok.Data;
import lombok.ToString;
+import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@Builder
@@ -36,4 +37,33 @@ public class SchemaData {
private final byte[] data;
@Builder.Default
private Map<String, String> props = new HashMap<>();
+
+ /**
+ * Convert a schema data to a schema info.
+ *
+ * @return the converted schema info.
+ */
+ public SchemaInfo toSchemaInfo() {
+ return SchemaInfo.builder()
+ .name("")
+ .type(type)
+ .schema(data)
+ .properties(props)
+ .build();
+ }
+
+ /**
+ * Convert a schema info to a schema data
+ *
+ * @param schemaInfo schema info
+ * @return the converted schema schema data
+ */
+ public static SchemaData fromSchemaInfo(SchemaInfo schemaInfo) {
+ return SchemaData.builder()
+ .type(schemaInfo.getType())
+ .data(schemaInfo.getSchema())
+ .props(schemaInfo.getProperties())
+ .build();
+ }
+
}
\ No newline at end of file
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index 2a400ae..3af34d7 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -159,7 +159,7 @@ public class CLITest extends PulsarTestSuite {
"schemas",
"get",
topicName);
- assertTrue(result.getStdout().contains("\"type\":\"STRING\""));
+ assertTrue(result.getStdout().contains("\"type\": \"STRING\""));
// delete the schema
result = container.execCmd(