You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/12 07:14:35 UTC

[pulsar] branch branch-2.11 updated: [improve][client] Refactor SchemaHash to reduce call of hashFunction in SchemaHash (#17948)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 2e85be99747 [improve][client] Refactor SchemaHash to reduce call of hashFunction in SchemaHash (#17948)
2e85be99747 is described below

commit 2e85be99747ad57d0ebca0e0104605eb62d189cf
Author: Xiaoyu Hou <ho...@apache.org>
AuthorDate: Wed Oct 12 15:11:39 2022 +0800

    [improve][client] Refactor SchemaHash to reduce call of hashFunction in SchemaHash (#17948)
---
 .../org/apache/pulsar/client/impl/MessageImpl.java |  2 +-
 .../pulsar/client/impl/schema/BooleanSchema.java   |  8 ++---
 .../pulsar/client/impl/schema/ByteBufSchema.java   |  8 ++---
 .../client/impl/schema/ByteBufferSchema.java       |  8 ++---
 .../pulsar/client/impl/schema/ByteSchema.java      |  8 ++---
 .../pulsar/client/impl/schema/BytesSchema.java     |  8 ++---
 .../pulsar/client/impl/schema/DateSchema.java      |  8 ++---
 .../pulsar/client/impl/schema/DoubleSchema.java    |  8 ++---
 .../pulsar/client/impl/schema/FloatSchema.java     |  8 ++---
 .../pulsar/client/impl/schema/InstantSchema.java   |  8 ++---
 .../pulsar/client/impl/schema/IntSchema.java       |  8 ++---
 .../pulsar/client/impl/schema/JSONSchema.java      | 11 ++++---
 .../pulsar/client/impl/schema/LocalDateSchema.java |  8 ++---
 .../client/impl/schema/LocalDateTimeSchema.java    |  8 ++---
 .../pulsar/client/impl/schema/LocalTimeSchema.java |  8 ++---
 .../pulsar/client/impl/schema/LongSchema.java      |  8 ++---
 .../pulsar/client/impl/schema/ShortSchema.java     |  8 ++---
 .../pulsar/client/impl/schema/StringSchema.java    | 19 ++++++-----
 .../pulsar/client/impl/schema/TimeSchema.java      |  8 ++---
 .../pulsar/client/impl/schema/TimestampSchema.java |  8 ++---
 .../client/impl/schema/KeyValueSchemaInfoTest.java | 10 +++---
 .../client/impl/schema/StringSchemaTest.java       | 20 ++++++------
 .../pulsar/client/impl/schema/SchemaInfoImpl.java  | 34 ++++++++++++++++---
 .../pulsar/common/protocol/schema/SchemaHash.java  | 25 +++++++++-----
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 38 ++++++++++++----------
 25 files changed, 168 insertions(+), 127 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 6d1d586b49b..f42ad9b6abf 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -436,7 +436,7 @@ public class MessageImpl<T> implements Message<T> {
     }
 
     public SchemaHash getSchemaHash() {
-        return schemaHash == null ? SchemaHash.of(new byte[0], null) : schemaHash;
+        return schemaHash == null ? SchemaHash.empty() : schemaHash;
     }
 
     public void setSchemaInfoForReplicator(SchemaInfo schemaInfo) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
index 0c41e1f183b..49548c1a209 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
@@ -32,10 +32,10 @@ public class BooleanSchema extends AbstractSchema<Boolean> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfoImpl()
-                .setName("Boolean")
-                .setType(SchemaType.BOOLEAN)
-                .setSchema(new byte[0]);
+        SCHEMA_INFO = SchemaInfoImpl.builder()
+                .name("Boolean")
+                .type(SchemaType.BOOLEAN)
+                .schema(new byte[0]).build();
         INSTANCE = new BooleanSchema();
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
index 7665d96ee72..73431ae1d55 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
@@ -33,10 +33,10 @@ public class ByteBufSchema extends AbstractSchema<ByteBuf> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfoImpl()
-            .setName("ByteBuf")
-            .setType(SchemaType.BYTES)
-            .setSchema(new byte[0]);
+        SCHEMA_INFO = SchemaInfoImpl.builder()
+            .name("ByteBuf")
+            .type(SchemaType.BYTES)
+            .schema(new byte[0]).build();
         INSTANCE = new ByteBufSchema();
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
index 15184131f30..0bc4e9ce320 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
@@ -32,10 +32,10 @@ public class ByteBufferSchema extends AbstractSchema<ByteBuffer> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfoImpl()
-            .setName("ByteBuffer")
-            .setType(SchemaType.BYTES)
-            .setSchema(new byte[0]);
+        SCHEMA_INFO = SchemaInfoImpl.builder()
+            .name("ByteBuffer")
+            .type(SchemaType.BYTES)
+            .schema(new byte[0]).build();
         INSTANCE = new ByteBufferSchema();
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
index 6d516879bd5..3e56381675b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
@@ -32,10 +32,10 @@ public class ByteSchema extends AbstractSchema<Byte> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfoImpl()
-            .setName("INT8")
-            .setType(SchemaType.INT8)
-            .setSchema(new byte[0]);
+        SCHEMA_INFO = SchemaInfoImpl.builder()
+            .name("INT8")
+            .type(SchemaType.INT8)
+            .schema(new byte[0]).build();
         INSTANCE = new ByteSchema();
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
index 98a0e66439d..5706af9505a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
@@ -31,10 +31,10 @@ public class BytesSchema extends AbstractSchema<byte[]> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfoImpl()
-            .setName("Bytes")
-            .setType(SchemaType.BYTES)
-            .setSchema(new byte[0]);
+        SCHEMA_INFO = SchemaInfoImpl.builder()
+            .name("Bytes")
+            .type(SchemaType.BYTES)
+            .schema(new byte[0]).build();
         INSTANCE = new BytesSchema();
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
index f632b99fc94..79976917e46 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
@@ -32,10 +32,10 @@ public class DateSchema extends AbstractSchema<Date> {
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfoImpl()
-             .setName("Date")
-             .setType(SchemaType.DATE)
-             .setSchema(new byte[0]);
+       SCHEMA_INFO = SchemaInfoImpl.builder()
+             .name("Date")
+             .type(SchemaType.DATE)
+             .schema(new byte[0]).build();
        INSTANCE = new DateSchema();
    }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
index d38deb474ab..b5d8076a046 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
@@ -32,10 +32,10 @@ public class DoubleSchema extends AbstractSchema<Double> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfoImpl()
-            .setName("Double")
-            .setType(SchemaType.DOUBLE)
-            .setSchema(new byte[0]);
+        SCHEMA_INFO = SchemaInfoImpl.builder()
+            .name("Double")
+            .type(SchemaType.DOUBLE)
+            .schema(new byte[0]).build();
         INSTANCE = new DoubleSchema();
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
index 84d40735bc1..0d26411cb72 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
@@ -32,10 +32,10 @@ public class FloatSchema extends AbstractSchema<Float> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfoImpl()
-                .setName("Float")
-                .setType(SchemaType.FLOAT)
-                .setSchema(new byte[0]);
+        SCHEMA_INFO = SchemaInfoImpl.builder()
+                .name("Float")
+                .type(SchemaType.FLOAT)
+                .schema(new byte[0]).build();
         INSTANCE = new FloatSchema();
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
index 8adf7c1e9c2..569a9380eb8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
@@ -33,10 +33,10 @@ public class InstantSchema extends AbstractSchema<Instant> {
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfoImpl()
-             .setName("Instant")
-             .setType(SchemaType.INSTANT)
-             .setSchema(new byte[0]);
+       SCHEMA_INFO = SchemaInfoImpl.builder()
+             .name("Instant")
+             .type(SchemaType.INSTANT)
+             .schema(new byte[0]).build();
        INSTANCE = new InstantSchema();
    }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
index dfad2808218..521256532c6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
@@ -32,10 +32,10 @@ public class IntSchema extends AbstractSchema<Integer> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfoImpl()
-            .setName("INT32")
-            .setType(SchemaType.INT32)
-            .setSchema(new byte[0]);
+        SCHEMA_INFO = SchemaInfoImpl.builder()
+            .name("INT32")
+            .type(SchemaType.INT32)
+            .schema(new byte[0]).build();
         INSTANCE = new IntSchema();
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 4f09ffe35b9..ad421d8cde3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -71,11 +71,12 @@ public class JSONSchema<T> extends AvroBaseStructSchema<T> {
             ObjectMapper objectMapper = new ObjectMapper();
             JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(objectMapper);
             JsonSchema jsonBackwardsCompatibleSchema = schemaGen.generateSchema(pojo);
-            backwardsCompatibleSchemaInfo = new SchemaInfoImpl()
-                    .setName("")
-                    .setProperties(schemaInfo.getProperties())
-                    .setType(SchemaType.JSON)
-                    .setSchema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema));
+            backwardsCompatibleSchemaInfo = SchemaInfoImpl.builder()
+                    .name("")
+                    .properties(schemaInfo.getProperties())
+                    .type(SchemaType.JSON)
+                    .schema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema))
+                    .build();
         } catch (JsonProcessingException ex) {
             throw new RuntimeException(ex);
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
index 4115eee9c85..5c0420a0a44 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
@@ -32,10 +32,10 @@ public class LocalDateSchema extends AbstractSchema<LocalDate> {
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfoImpl()
-             .setName("LocalDate")
-             .setType(SchemaType.LOCAL_DATE)
-             .setSchema(new byte[0]);
+       SCHEMA_INFO = SchemaInfoImpl.builder()
+             .name("LocalDate")
+             .type(SchemaType.LOCAL_DATE)
+             .schema(new byte[0]).build();
        INSTANCE = new LocalDateSchema();
    }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
index a6bac5f417e..e7c5965b55b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
@@ -36,10 +36,10 @@ public class LocalDateTimeSchema extends AbstractSchema<LocalDateTime> {
    public static final String DELIMITER = ":";
 
    static {
-       SCHEMA_INFO = new SchemaInfoImpl()
-             .setName("LocalDateTime")
-             .setType(SchemaType.LOCAL_DATE_TIME)
-             .setSchema(new byte[0]);
+       SCHEMA_INFO = SchemaInfoImpl.builder()
+             .name("LocalDateTime")
+             .type(SchemaType.LOCAL_DATE_TIME)
+             .schema(new byte[0]).build();
        INSTANCE = new LocalDateTimeSchema();
    }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
index cdf06632c17..59ba5d2b860 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
@@ -32,10 +32,10 @@ public class LocalTimeSchema extends AbstractSchema<LocalTime> {
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfoImpl()
-             .setName("LocalTime")
-             .setType(SchemaType.LOCAL_TIME)
-             .setSchema(new byte[0]);
+       SCHEMA_INFO = SchemaInfoImpl.builder()
+             .name("LocalTime")
+             .type(SchemaType.LOCAL_TIME)
+             .schema(new byte[0]).build();
        INSTANCE = new LocalTimeSchema();
    }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
index deccaf4ded8..e136fb39f2a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
@@ -32,10 +32,10 @@ public class LongSchema extends AbstractSchema<Long> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfoImpl()
-            .setName("INT64")
-            .setType(SchemaType.INT64)
-            .setSchema(new byte[0]);
+        SCHEMA_INFO = SchemaInfoImpl.builder()
+            .name("INT64")
+            .type(SchemaType.INT64)
+            .schema(new byte[0]).build();
         INSTANCE = new LongSchema();
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
index bbb5ad67529..afa0f9a3f12 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
@@ -32,10 +32,10 @@ public class ShortSchema extends AbstractSchema<Short> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfoImpl()
-            .setName("INT16")
-            .setType(SchemaType.INT16)
-            .setSchema(new byte[0]);
+        SCHEMA_INFO = SchemaInfoImpl.builder()
+            .name("INT16")
+            .type(SchemaType.INT16)
+            .schema(new byte[0]).build();
         INSTANCE = new ShortSchema();
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
index 370946aba37..2096f83bcb7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
@@ -43,10 +43,10 @@ public class StringSchema extends AbstractSchema<String> {
         // Ensure the ordering of the static initialization
         CHARSET_KEY = "__charset";
         DEFAULT_CHARSET = StandardCharsets.UTF_8;
-        DEFAULT_SCHEMA_INFO = new SchemaInfoImpl()
-                .setName("String")
-                .setType(SchemaType.STRING)
-                .setSchema(new byte[0]);
+        DEFAULT_SCHEMA_INFO = SchemaInfoImpl.builder()
+                .name("String")
+                .type(SchemaType.STRING)
+                .schema(new byte[0]).build();
 
         UTF8 = new StringSchema(StandardCharsets.UTF_8);
     }
@@ -84,11 +84,12 @@ public class StringSchema extends AbstractSchema<String> {
         this.charset = charset;
         Map<String, String> properties = new HashMap<>();
         properties.put(CHARSET_KEY, charset.name());
-        this.schemaInfo = new SchemaInfoImpl()
-                .setName(DEFAULT_SCHEMA_INFO.getName())
-                .setType(SchemaType.STRING)
-                .setSchema(DEFAULT_SCHEMA_INFO.getSchema())
-                .setProperties(properties);
+        this.schemaInfo = SchemaInfoImpl.builder()
+                .name(DEFAULT_SCHEMA_INFO.getName())
+                .type(SchemaType.STRING)
+                .schema(DEFAULT_SCHEMA_INFO.getSchema())
+                .properties(properties)
+                .build();
     }
 
     public byte[] encode(String message) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
index 2be6e9d3a1e..43fc0693f17 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
@@ -32,10 +32,10 @@ public class TimeSchema extends AbstractSchema<Time> {
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfoImpl()
-             .setName("Time")
-             .setType(SchemaType.TIME)
-             .setSchema(new byte[0]);
+       SCHEMA_INFO = SchemaInfoImpl.builder()
+             .name("Time")
+             .type(SchemaType.TIME)
+             .schema(new byte[0]).build();
        INSTANCE = new TimeSchema();
    }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
index 0b42d47146e..c89dfefc98b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
@@ -32,10 +32,10 @@ public class TimestampSchema extends AbstractSchema<Timestamp> {
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfoImpl()
-             .setName("Timestamp")
-             .setType(SchemaType.TIMESTAMP)
-             .setSchema(new byte[0]);
+       SCHEMA_INFO = SchemaInfoImpl.builder()
+             .name("Timestamp")
+             .type(SchemaType.TIMESTAMP)
+             .schema(new byte[0]).build();
        INSTANCE = new TimestampSchema();
    }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
index 188c1e72e54..169156917f4 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
@@ -170,11 +170,11 @@ public class KeyValueSchemaInfoTest {
             KeyValueEncodingType.SEPARATED
         );
 
-        SchemaInfo oldSchemaInfo = new SchemaInfoImpl()
-            .setName("")
-            .setType(SchemaType.KEY_VALUE)
-            .setSchema(kvSchema.getSchemaInfo().getSchema())
-            .setProperties(Collections.emptyMap());
+        SchemaInfo oldSchemaInfo = SchemaInfoImpl.builder()
+            .name("")
+            .type(SchemaType.KEY_VALUE)
+            .schema(kvSchema.getSchemaInfo().getSchema())
+            .properties(Collections.emptyMap()).build();
 
         assertEquals(
                 DefaultImplementation.getDefaultImplementation().decodeKeyValueEncodingType(oldSchemaInfo),
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
index 8a9897445f2..4b220857614 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
@@ -86,11 +86,11 @@ public class StringSchemaTest {
 
     @Test
     public void testSchemaInfoWithoutCharset() {
-        SchemaInfo si = new SchemaInfoImpl()
-            .setName("test-schema-info-without-charset")
-            .setType(SchemaType.STRING)
-            .setSchema(new byte[0])
-            .setProperties(Collections.emptyMap());
+        SchemaInfo si = SchemaInfoImpl.builder()
+            .name("test-schema-info-without-charset")
+            .type(SchemaType.STRING)
+            .schema(new byte[0])
+            .properties(Collections.emptyMap()).build();
         StringSchema schema = StringSchema.fromSchemaInfo(si);
 
         String myString = "my string for test";
@@ -121,11 +121,11 @@ public class StringSchemaTest {
     public void testSchemaInfoWithCharset(Charset charset) {
         Map<String, String> properties = new HashMap<>();
         properties.put(StringSchema.CHARSET_KEY, charset.name());
-        SchemaInfo si = new SchemaInfoImpl()
-            .setName("test-schema-info-without-charset")
-            .setType(SchemaType.STRING)
-            .setSchema(new byte[0])
-            .setProperties(properties);
+        SchemaInfo si = SchemaInfoImpl.builder()
+            .name("test-schema-info-without-charset")
+            .type(SchemaType.STRING)
+            .schema(new byte[0])
+            .properties(properties).build();
         StringSchema schema = StringSchema.fromSchemaInfo(si);
 
         String myString = "my string for test";
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java
index d67dc5f29e1..b06ee517ac7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java
@@ -19,10 +19,10 @@
 package org.apache.pulsar.client.impl.schema;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.Map;
-import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -30,6 +30,7 @@ import lombok.NoArgsConstructor;
 import lombok.experimental.Accessors;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
+import org.apache.pulsar.common.protocol.schema.SchemaHash;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -40,10 +41,8 @@ import org.apache.pulsar.common.schema.SchemaType;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 @Data
-@AllArgsConstructor
 @NoArgsConstructor
 @Accessors(chain = true)
-@Builder
 public class SchemaInfoImpl implements SchemaInfo {
 
     @EqualsAndHashCode.Exclude
@@ -67,9 +66,23 @@ public class SchemaInfoImpl implements SchemaInfo {
     /**
      * Additional properties of the schema definition (implementation defined).
      */
-    @Builder.Default
     private Map<String, String> properties = Collections.emptyMap();
 
+    @EqualsAndHashCode.Exclude
+    @JsonIgnore
+    private transient SchemaHash schemaHash;
+
+    @Builder
+    public SchemaInfoImpl(String name, byte[] schema, SchemaType type, long timestamp,
+                          Map<String, String> properties) {
+        this.name = name;
+        this.schema = schema;
+        this.type = type;
+        this.timestamp = timestamp;
+        this.properties = properties == null ? Collections.emptyMap() : properties;
+        this.schemaHash = SchemaHash.of(this.schema, this.type);
+    }
+
     public String getSchemaDefinition() {
         if (null == schema) {
             return "";
@@ -89,6 +102,19 @@ public class SchemaInfoImpl implements SchemaInfo {
         }
     }
 
+    /**
+     * Calculate the SchemaHash for compatible with `@NoArgsConstructor`.
+     * If SchemaInfoImpl is created by no-args-constructor from users, the schemaHash will be null.
+     * Note: We should remove this method as long as `@NoArgsConstructor` removed at major release to avoid null-check
+     * overhead.
+     */
+    public SchemaHash getSchemaHash() {
+        if (schemaHash == null) {
+            schemaHash = SchemaHash.of(this.schema, this.type);
+        }
+        return schemaHash;
+    }
+
     @Override
     public String toString() {
         return SchemaUtils.jsonifySchemaInfo(this);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
index 8bbc18fbb70..46f92b1ec9a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
@@ -21,9 +21,9 @@ package org.apache.pulsar.common.protocol.schema;
 import com.google.common.hash.HashCode;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
-import java.util.Optional;
 import lombok.EqualsAndHashCode;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -33,7 +33,8 @@ import org.apache.pulsar.common.schema.SchemaType;
 @EqualsAndHashCode
 public class SchemaHash {
 
-    private static HashFunction hashFunction = Hashing.sha256();
+    private static final HashFunction hashFunction = Hashing.sha256();
+    private static final SchemaHash EMPTY_SCHEMA_HASH = new SchemaHash(hashFunction.hashBytes(new byte[0]), null);
 
     private final HashCode hash;
 
@@ -45,9 +46,10 @@ public class SchemaHash {
     }
 
     public static SchemaHash of(Schema schema) {
-        Optional<SchemaInfo> schemaInfo = Optional.ofNullable(schema).map(Schema::getSchemaInfo);
-        return of(schemaInfo.map(SchemaInfo::getSchema).orElseGet(() -> new byte[0]),
-                schemaInfo.map(SchemaInfo::getType).orElse(null));
+        if (schema == null || schema.getSchemaInfo() == null) {
+            return EMPTY_SCHEMA_HASH;
+        }
+        return ((SchemaInfoImpl) schema.getSchemaInfo()).getSchemaHash();
     }
 
     public static SchemaHash of(SchemaData schemaData) {
@@ -55,12 +57,19 @@ public class SchemaHash {
     }
 
     public static SchemaHash of(SchemaInfo schemaInfo) {
-        return of(schemaInfo == null ? new byte[0] : schemaInfo.getSchema(),
-                schemaInfo == null ? null : schemaInfo.getType());
+        if (schemaInfo == null) {
+            return EMPTY_SCHEMA_HASH;
+        }
+        return ((SchemaInfoImpl) schemaInfo).getSchemaHash();
     }
 
+    public static SchemaHash empty() {
+        return EMPTY_SCHEMA_HASH;
+    }
+
+    // Shouldn't call this method frequently, otherwise will bring performance regression
     public static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) {
-        return new SchemaHash(hashFunction.hashBytes(schemaBytes), schemaType);
+        return new SchemaHash(hashFunction.hashBytes(schemaBytes == null ? new byte[0] : schemaBytes), schemaType);
     }
 
     public byte[] asBytes() {
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 0c260b4eb65..d3e1a40550f 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -692,10 +692,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
 
     @Test
     public void kafkaLogicalTypesTimestampTest() {
-        Schema schema = new TestSchema(new SchemaInfoImpl()
-                .setName(Timestamp.LOGICAL_NAME)
-                .setType(SchemaType.INT64)
-                .setSchema(new byte[0]));
+        Schema schema = new TestSchema(SchemaInfoImpl.builder()
+                .name(Timestamp.LOGICAL_NAME)
+                .type(SchemaType.INT64)
+                .schema(new byte[0])
+                .build());
 
         org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
                 .getKafkaConnectSchema(schema);
@@ -709,10 +710,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
 
     @Test
     public void kafkaLogicalTypesTimeTest() {
-        Schema schema = new TestSchema(new SchemaInfoImpl()
-                .setName(Time.LOGICAL_NAME)
-                .setType(SchemaType.INT32)
-                .setSchema(new byte[0]));
+        Schema schema = new TestSchema(SchemaInfoImpl.builder()
+                .name(Time.LOGICAL_NAME)
+                .type(SchemaType.INT32)
+                .schema(new byte[0])
+                .build());
 
         org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
                 .getKafkaConnectSchema(schema);
@@ -726,10 +728,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
 
     @Test
     public void kafkaLogicalTypesDateTest() {
-        Schema schema = new TestSchema(new SchemaInfoImpl()
-                .setName(Date.LOGICAL_NAME)
-                .setType(SchemaType.INT32)
-                .setSchema(new byte[0]));
+        Schema schema = new TestSchema(SchemaInfoImpl.builder()
+                .name(Date.LOGICAL_NAME)
+                .type(SchemaType.INT32)
+                .schema(new byte[0])
+                .build());
 
         org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
                 .getKafkaConnectSchema(schema);
@@ -745,11 +748,12 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
     public void kafkaLogicalTypesDecimalTest() {
         Map<String, String> props = new HashMap<>();
         props.put("scale", "10");
-        Schema schema = new TestSchema(new SchemaInfoImpl()
-                .setName(Decimal.LOGICAL_NAME)
-                .setType(SchemaType.BYTES)
-                .setProperties(props)
-                .setSchema(new byte[0]));
+        Schema schema = new TestSchema(SchemaInfoImpl.builder()
+                .name(Decimal.LOGICAL_NAME)
+                .type(SchemaType.BYTES)
+                .properties(props)
+                .schema(new byte[0])
+                .build());
 
         org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
                 .getKafkaConnectSchema(schema);