You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/05/13 07:46:27 UTC

[pulsar] 33/46: Add Schema.getNativeSchema (#10076)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch 2.7.2_ds_tmp
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2932b1057a2fda22354ac825e5ff34553131cc52
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Thu Apr 1 17:33:56 2021 +0200

    Add Schema.getNativeSchema (#10076)
    
    (cherry picked from commit 4128151412031f9a354d2296b69533d9533a9fbf)
---
 .../apache/pulsar/client/api/SimpleSchemaTest.java |  5 +++++
 .../java/org/apache/pulsar/client/api/Schema.java  | 11 +++++++++++
 .../client/impl/schema/AutoConsumeSchema.java      |  2 +-
 .../client/impl/schema/AvroBaseStructSchema.java   |  7 +++++++
 .../client/impl/schema/ProtobufNativeSchema.java   |  6 ++++++
 .../pulsar/client/impl/schema/AvroSchemaTest.java  |  9 ++++++++-
 .../pulsar/client/impl/schema/JSONSchemaTest.java  |  4 ++++
 .../impl/schema/ProtobufNativeSchemaTest.java      | 23 +++++++++++++++++-----
 8 files changed, 60 insertions(+), 7 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index ae44311..802d059 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -528,7 +528,12 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 Message<GenericRecord> data = c.receive();
                 assertNotNull(data.getSchemaVersion());
                 assertEquals(data.getValue().getField("i"), i);
+                MessageImpl impl = (MessageImpl) data;
+
+                org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) impl.getSchema().getNativeSchema().get();
+                assertNotNull(avroSchema);
             }
+
         }
     }
 
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index 9b41836..a9df5cc 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -26,6 +26,8 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.Date;
+import java.util.Optional;
+
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.api.schema.GenericObject;
@@ -158,6 +160,15 @@ public interface Schema<T> extends Cloneable{
     Schema<byte[]> BYTES = DefaultImplementation.newBytesSchema();
 
     /**
+     * Return the native schema that is wrapped by Pulsar API.
+     * For instance with this method you can access the Avro schema
+     * @return the internal schema or null if not present
+     */
+    default Optional<Object> getNativeSchema() {
+        return Optional.empty();
+    }
+
+    /**
      * ByteBuffer Schema.
      */
     Schema<ByteBuffer> BYTEBUFFER = DefaultImplementation.newByteBufferSchema();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 1e43fc4..3ec21a2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -117,7 +117,6 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
     }
 
     @Override
-<<<<<<< HEAD
     public Schema<GenericRecord> clone() {
         Schema<GenericRecord> schema = Schema.AUTO_CONSUME();
         if (this.schema != null) {
@@ -131,6 +130,7 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
         return schema;
     }
 
+    @Override
     public Optional<Object> getNativeSchema() {
         ensureSchemaInitialized();
         if (schema == null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroBaseStructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroBaseStructSchema.java
index b464731..5e449b4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroBaseStructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroBaseStructSchema.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.client.impl.schema;
 import org.apache.avro.Schema;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
+import java.util.Optional;
+
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema;
 
@@ -46,4 +48,9 @@ public abstract class AvroBaseStructSchema<T> extends AbstractStructSchema<T>{
     public Schema getAvroSchema(){
         return schema;
     }
+
+    @Override
+    public Optional<Object> getNativeSchema() {
+        return Optional.of(schema);
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
index 839cb17..02a1b46 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
@@ -38,6 +38,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Consumer;
 
 /**
@@ -102,6 +103,11 @@ public class ProtobufNativeSchema<T extends GeneratedMessageV3> extends Abstract
         return ProtobufNativeSchemaUtils.deserialize(this.schemaInfo.getSchema());
     }
 
+    @Override
+    public Optional<Object> getNativeSchema() {
+        return Optional.of(getProtobufNativeSchema());
+    }
+
     public static <T extends GeneratedMessageV3> ProtobufNativeSchema<T> of(Class<T> pojo) {
         return of(pojo, new HashMap<>());
     }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index 9116e28..dcf3abe 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -25,6 +25,7 @@ import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_AVRO_A
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.fail;
+import static org.testng.AssertJUnit.assertSame;
 
 import java.math.BigDecimal;
 import java.time.Instant;
@@ -126,10 +127,16 @@ public class AvroSchemaTest {
     }
 
     @Test
+    public void testGetNativeSchema() throws SchemaValidationException {
+        AvroSchema<StructWithAnnotations> schema2 = AvroSchema.of(StructWithAnnotations.class);
+        org.apache.avro.Schema avroSchema2 = (Schema) schema2.getNativeSchema().get();
+        assertSame(schema2.schema, avroSchema2);
+    }
+
+    @Test
     public void testSchemaDefinition() throws SchemaValidationException {
         org.apache.avro.Schema schema1 = ReflectData.get().getSchema(DefaultStruct.class);
         AvroSchema<StructWithAnnotations> schema2 = AvroSchema.of(StructWithAnnotations.class);
-
         String schemaDef1 = schema1.toString();
         String schemaDef2 = new String(schema2.getSchemaInfo().getSchema(), UTF_8);
         assertNotEquals(
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
index 172bae3..586eb08 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
@@ -27,10 +27,14 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.AssertJUnit.assertSame;
 
+import static org.testng.AssertJUnit.assertSame;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
+import org.apache.avro.SchemaValidationException;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.Field;
 import org.apache.pulsar.client.api.schema.GenericRecord;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java
index 9300ce5..59f1b80 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl.schema;
 
+import com.google.protobuf.Descriptors;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import lombok.extern.slf4j.Slf4j;
@@ -29,6 +30,10 @@ import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashMap;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.AssertJUnit.assertSame;
+
 @Slf4j
 public class ProtobufNativeSchemaTest {
 
@@ -51,7 +56,7 @@ public class ProtobufNativeSchemaTest {
         byte[] bytes = protobufSchema.encode(testMessage);
         org.apache.pulsar.client.schema.proto.Test.TestMessage message = protobufSchema.decode(bytes);
 
-        Assert.assertEquals(message.getStringField(), stringFieldValue);
+        assertEquals(message.getStringField(), stringFieldValue);
     }
 
     @Test
@@ -59,10 +64,10 @@ public class ProtobufNativeSchemaTest {
         ProtobufNativeSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema
                 = ProtobufNativeSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class);
 
-        Assert.assertEquals(protobufSchema.getSchemaInfo().getType(), SchemaType.PROTOBUF_NATIVE);
+        assertEquals(protobufSchema.getSchemaInfo().getType(), SchemaType.PROTOBUF_NATIVE);
 
-        Assert.assertNotNull(ProtobufNativeSchemaUtils.deserialize(protobufSchema.getSchemaInfo().getSchema()));
-        Assert.assertEquals(new String(protobufSchema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8), EXPECTED_SCHEMA_JSON);
+        assertNotNull(ProtobufNativeSchemaUtils.deserialize(protobufSchema.getSchemaInfo().getSchema()));
+        assertEquals(new String(protobufSchema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8), EXPECTED_SCHEMA_JSON);
     }
 
     @Test
@@ -96,8 +101,16 @@ public class ProtobufNativeSchemaTest {
         ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(bytes.length);
         byteBuf.writeBytes(bytes);
 
-        Assert.assertEquals(testMessage, protobufSchema.decode(byteBuf));
+        assertEquals(testMessage, protobufSchema.decode(byteBuf));
+
+    }
 
+    @Test
+    public void testGetNativeSchema()  {
+        ProtobufNativeSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema
+                = ProtobufNativeSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class);
+        Descriptors.Descriptor nativeSchema = (Descriptors.Descriptor) protobufSchema.getNativeSchema().get();
+        assertNotNull(nativeSchema);
     }
 
 }