You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:27 UTC
[pulsar] 33/46: Add Schema.getNativeSchema (#10076)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2932b1057a2fda22354ac825e5ff34553131cc52
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Thu Apr 1 17:33:56 2021 +0200
Add Schema.getNativeSchema (#10076)
(cherry picked from commit 4128151412031f9a354d2296b69533d9533a9fbf)
---
.../apache/pulsar/client/api/SimpleSchemaTest.java | 5 +++++
.../java/org/apache/pulsar/client/api/Schema.java | 11 +++++++++++
.../client/impl/schema/AutoConsumeSchema.java | 2 +-
.../client/impl/schema/AvroBaseStructSchema.java | 7 +++++++
.../client/impl/schema/ProtobufNativeSchema.java | 6 ++++++
.../pulsar/client/impl/schema/AvroSchemaTest.java | 9 ++++++++-
.../pulsar/client/impl/schema/JSONSchemaTest.java | 4 ++++
.../impl/schema/ProtobufNativeSchemaTest.java | 23 +++++++++++++++++-----
8 files changed, 60 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index ae44311..802d059 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -528,7 +528,12 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
Message<GenericRecord> data = c.receive();
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getField("i"), i);
+ MessageImpl impl = (MessageImpl) data;
+
+ org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) impl.getSchema().getNativeSchema().get();
+ assertNotNull(avroSchema);
}
+
}
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index 9b41836..a9df5cc 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -26,6 +26,8 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;
+import java.util.Optional;
+
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.GenericObject;
@@ -158,6 +160,15 @@ public interface Schema<T> extends Cloneable{
Schema<byte[]> BYTES = DefaultImplementation.newBytesSchema();
/**
+ * Return the native schema that is wrapped by Pulsar API.
+ * For instance with this method you can access the Avro schema
+ * @return the internal schema or null if not present
+ */
+ default Optional<Object> getNativeSchema() {
+ return Optional.empty();
+ }
+
+ /**
* ByteBuffer Schema.
*/
Schema<ByteBuffer> BYTEBUFFER = DefaultImplementation.newByteBufferSchema();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 1e43fc4..3ec21a2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -117,7 +117,6 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
}
@Override
-<<<<<<< HEAD
public Schema<GenericRecord> clone() {
Schema<GenericRecord> schema = Schema.AUTO_CONSUME();
if (this.schema != null) {
@@ -131,6 +130,7 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
return schema;
}
+ @Override
public Optional<Object> getNativeSchema() {
ensureSchemaInitialized();
if (schema == null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroBaseStructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroBaseStructSchema.java
index b464731..5e449b4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroBaseStructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroBaseStructSchema.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.client.impl.schema;
import org.apache.avro.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;
+import java.util.Optional;
+
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema;
@@ -46,4 +48,9 @@ public abstract class AvroBaseStructSchema<T> extends AbstractStructSchema<T>{
public Schema getAvroSchema(){
return schema;
}
+
+ @Override
+ public Optional<Object> getNativeSchema() {
+ return Optional.of(schema);
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
index 839cb17..02a1b46 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
@@ -38,6 +38,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.function.Consumer;
/**
@@ -102,6 +103,11 @@ public class ProtobufNativeSchema<T extends GeneratedMessageV3> extends Abstract
return ProtobufNativeSchemaUtils.deserialize(this.schemaInfo.getSchema());
}
+ @Override
+ public Optional<Object> getNativeSchema() {
+ return Optional.of(getProtobufNativeSchema());
+ }
+
public static <T extends GeneratedMessageV3> ProtobufNativeSchema<T> of(Class<T> pojo) {
return of(pojo, new HashMap<>());
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index 9116e28..dcf3abe 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -25,6 +25,7 @@ import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_AVRO_A
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.fail;
+import static org.testng.AssertJUnit.assertSame;
import java.math.BigDecimal;
import java.time.Instant;
@@ -126,10 +127,16 @@ public class AvroSchemaTest {
}
@Test
+ public void testGetNativeSchema() throws SchemaValidationException {
+ AvroSchema<StructWithAnnotations> schema2 = AvroSchema.of(StructWithAnnotations.class);
+ org.apache.avro.Schema avroSchema2 = (Schema) schema2.getNativeSchema().get();
+ assertSame(schema2.schema, avroSchema2);
+ }
+
+ @Test
public void testSchemaDefinition() throws SchemaValidationException {
org.apache.avro.Schema schema1 = ReflectData.get().getSchema(DefaultStruct.class);
AvroSchema<StructWithAnnotations> schema2 = AvroSchema.of(StructWithAnnotations.class);
-
String schemaDef1 = schema1.toString();
String schemaDef2 = new String(schema2.getSchemaInfo().getSchema(), UTF_8);
assertNotEquals(
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
index 172bae3..586eb08 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
@@ -27,10 +27,14 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertSame;
+import static org.testng.AssertJUnit.assertSame;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
+import org.apache.avro.SchemaValidationException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java
index 9300ce5..59f1b80 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.schema;
+import com.google.protobuf.Descriptors;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import lombok.extern.slf4j.Slf4j;
@@ -29,6 +30,10 @@ import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.AssertJUnit.assertSame;
+
@Slf4j
public class ProtobufNativeSchemaTest {
@@ -51,7 +56,7 @@ public class ProtobufNativeSchemaTest {
byte[] bytes = protobufSchema.encode(testMessage);
org.apache.pulsar.client.schema.proto.Test.TestMessage message = protobufSchema.decode(bytes);
- Assert.assertEquals(message.getStringField(), stringFieldValue);
+ assertEquals(message.getStringField(), stringFieldValue);
}
@Test
@@ -59,10 +64,10 @@ public class ProtobufNativeSchemaTest {
ProtobufNativeSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema
= ProtobufNativeSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class);
- Assert.assertEquals(protobufSchema.getSchemaInfo().getType(), SchemaType.PROTOBUF_NATIVE);
+ assertEquals(protobufSchema.getSchemaInfo().getType(), SchemaType.PROTOBUF_NATIVE);
- Assert.assertNotNull(ProtobufNativeSchemaUtils.deserialize(protobufSchema.getSchemaInfo().getSchema()));
- Assert.assertEquals(new String(protobufSchema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8), EXPECTED_SCHEMA_JSON);
+ assertNotNull(ProtobufNativeSchemaUtils.deserialize(protobufSchema.getSchemaInfo().getSchema()));
+ assertEquals(new String(protobufSchema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8), EXPECTED_SCHEMA_JSON);
}
@Test
@@ -96,8 +101,16 @@ public class ProtobufNativeSchemaTest {
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(bytes.length);
byteBuf.writeBytes(bytes);
- Assert.assertEquals(testMessage, protobufSchema.decode(byteBuf));
+ assertEquals(testMessage, protobufSchema.decode(byteBuf));
+
+ }
+ @Test
+ public void testGetNativeSchema() {
+ ProtobufNativeSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema
+ = ProtobufNativeSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class);
+ Descriptors.Descriptor nativeSchema = (Descriptors.Descriptor) protobufSchema.getNativeSchema().get();
+ assertNotNull(nativeSchema);
}
}