You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/10/12 07:14:35 UTC
[pulsar] branch branch-2.11 updated: [improve][client] Refactor SchemaHash to reduce call of hashFunction in SchemaHash (#17948)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 2e85be99747 [improve][client] Refactor SchemaHash to reduce call of hashFunction in SchemaHash (#17948)
2e85be99747 is described below
commit 2e85be99747ad57d0ebca0e0104605eb62d189cf
Author: Xiaoyu Hou <ho...@apache.org>
AuthorDate: Wed Oct 12 15:11:39 2022 +0800
[improve][client] Refactor SchemaHash to reduce call of hashFunction in SchemaHash (#17948)
---
.../org/apache/pulsar/client/impl/MessageImpl.java | 2 +-
.../pulsar/client/impl/schema/BooleanSchema.java | 8 ++---
.../pulsar/client/impl/schema/ByteBufSchema.java | 8 ++---
.../client/impl/schema/ByteBufferSchema.java | 8 ++---
.../pulsar/client/impl/schema/ByteSchema.java | 8 ++---
.../pulsar/client/impl/schema/BytesSchema.java | 8 ++---
.../pulsar/client/impl/schema/DateSchema.java | 8 ++---
.../pulsar/client/impl/schema/DoubleSchema.java | 8 ++---
.../pulsar/client/impl/schema/FloatSchema.java | 8 ++---
.../pulsar/client/impl/schema/InstantSchema.java | 8 ++---
.../pulsar/client/impl/schema/IntSchema.java | 8 ++---
.../pulsar/client/impl/schema/JSONSchema.java | 11 ++++---
.../pulsar/client/impl/schema/LocalDateSchema.java | 8 ++---
.../client/impl/schema/LocalDateTimeSchema.java | 8 ++---
.../pulsar/client/impl/schema/LocalTimeSchema.java | 8 ++---
.../pulsar/client/impl/schema/LongSchema.java | 8 ++---
.../pulsar/client/impl/schema/ShortSchema.java | 8 ++---
.../pulsar/client/impl/schema/StringSchema.java | 19 ++++++-----
.../pulsar/client/impl/schema/TimeSchema.java | 8 ++---
.../pulsar/client/impl/schema/TimestampSchema.java | 8 ++---
.../client/impl/schema/KeyValueSchemaInfoTest.java | 10 +++---
.../client/impl/schema/StringSchemaTest.java | 20 ++++++------
.../pulsar/client/impl/schema/SchemaInfoImpl.java | 34 ++++++++++++++++---
.../pulsar/common/protocol/schema/SchemaHash.java | 25 +++++++++-----
.../io/kafka/connect/KafkaConnectSinkTest.java | 38 ++++++++++++----------
25 files changed, 168 insertions(+), 127 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 6d1d586b49b..f42ad9b6abf 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -436,7 +436,7 @@ public class MessageImpl<T> implements Message<T> {
}
public SchemaHash getSchemaHash() {
- return schemaHash == null ? SchemaHash.of(new byte[0], null) : schemaHash;
+ return schemaHash == null ? SchemaHash.empty() : schemaHash;
}
public void setSchemaInfoForReplicator(SchemaInfo schemaInfo) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
index 0c41e1f183b..49548c1a209 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
@@ -32,10 +32,10 @@ public class BooleanSchema extends AbstractSchema<Boolean> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("Boolean")
- .setType(SchemaType.BOOLEAN)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("Boolean")
+ .type(SchemaType.BOOLEAN)
+ .schema(new byte[0]).build();
INSTANCE = new BooleanSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
index 7665d96ee72..73431ae1d55 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
@@ -33,10 +33,10 @@ public class ByteBufSchema extends AbstractSchema<ByteBuf> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("ByteBuf")
- .setType(SchemaType.BYTES)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("ByteBuf")
+ .type(SchemaType.BYTES)
+ .schema(new byte[0]).build();
INSTANCE = new ByteBufSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
index 15184131f30..0bc4e9ce320 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
@@ -32,10 +32,10 @@ public class ByteBufferSchema extends AbstractSchema<ByteBuffer> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("ByteBuffer")
- .setType(SchemaType.BYTES)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("ByteBuffer")
+ .type(SchemaType.BYTES)
+ .schema(new byte[0]).build();
INSTANCE = new ByteBufferSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
index 6d516879bd5..3e56381675b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
@@ -32,10 +32,10 @@ public class ByteSchema extends AbstractSchema<Byte> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("INT8")
- .setType(SchemaType.INT8)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("INT8")
+ .type(SchemaType.INT8)
+ .schema(new byte[0]).build();
INSTANCE = new ByteSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
index 98a0e66439d..5706af9505a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
@@ -31,10 +31,10 @@ public class BytesSchema extends AbstractSchema<byte[]> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("Bytes")
- .setType(SchemaType.BYTES)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("Bytes")
+ .type(SchemaType.BYTES)
+ .schema(new byte[0]).build();
INSTANCE = new BytesSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
index f632b99fc94..79976917e46 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
@@ -32,10 +32,10 @@ public class DateSchema extends AbstractSchema<Date> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("Date")
- .setType(SchemaType.DATE)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("Date")
+ .type(SchemaType.DATE)
+ .schema(new byte[0]).build();
INSTANCE = new DateSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
index d38deb474ab..b5d8076a046 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
@@ -32,10 +32,10 @@ public class DoubleSchema extends AbstractSchema<Double> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("Double")
- .setType(SchemaType.DOUBLE)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("Double")
+ .type(SchemaType.DOUBLE)
+ .schema(new byte[0]).build();
INSTANCE = new DoubleSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
index 84d40735bc1..0d26411cb72 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
@@ -32,10 +32,10 @@ public class FloatSchema extends AbstractSchema<Float> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("Float")
- .setType(SchemaType.FLOAT)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("Float")
+ .type(SchemaType.FLOAT)
+ .schema(new byte[0]).build();
INSTANCE = new FloatSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
index 8adf7c1e9c2..569a9380eb8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
@@ -33,10 +33,10 @@ public class InstantSchema extends AbstractSchema<Instant> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("Instant")
- .setType(SchemaType.INSTANT)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("Instant")
+ .type(SchemaType.INSTANT)
+ .schema(new byte[0]).build();
INSTANCE = new InstantSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
index dfad2808218..521256532c6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
@@ -32,10 +32,10 @@ public class IntSchema extends AbstractSchema<Integer> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("INT32")
- .setType(SchemaType.INT32)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("INT32")
+ .type(SchemaType.INT32)
+ .schema(new byte[0]).build();
INSTANCE = new IntSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 4f09ffe35b9..ad421d8cde3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -71,11 +71,12 @@ public class JSONSchema<T> extends AvroBaseStructSchema<T> {
ObjectMapper objectMapper = new ObjectMapper();
JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(objectMapper);
JsonSchema jsonBackwardsCompatibleSchema = schemaGen.generateSchema(pojo);
- backwardsCompatibleSchemaInfo = new SchemaInfoImpl()
- .setName("")
- .setProperties(schemaInfo.getProperties())
- .setType(SchemaType.JSON)
- .setSchema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema));
+ backwardsCompatibleSchemaInfo = SchemaInfoImpl.builder()
+ .name("")
+ .properties(schemaInfo.getProperties())
+ .type(SchemaType.JSON)
+ .schema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema))
+ .build();
} catch (JsonProcessingException ex) {
throw new RuntimeException(ex);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
index 4115eee9c85..5c0420a0a44 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
@@ -32,10 +32,10 @@ public class LocalDateSchema extends AbstractSchema<LocalDate> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("LocalDate")
- .setType(SchemaType.LOCAL_DATE)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("LocalDate")
+ .type(SchemaType.LOCAL_DATE)
+ .schema(new byte[0]).build();
INSTANCE = new LocalDateSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
index a6bac5f417e..e7c5965b55b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
@@ -36,10 +36,10 @@ public class LocalDateTimeSchema extends AbstractSchema<LocalDateTime> {
public static final String DELIMITER = ":";
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("LocalDateTime")
- .setType(SchemaType.LOCAL_DATE_TIME)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("LocalDateTime")
+ .type(SchemaType.LOCAL_DATE_TIME)
+ .schema(new byte[0]).build();
INSTANCE = new LocalDateTimeSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
index cdf06632c17..59ba5d2b860 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
@@ -32,10 +32,10 @@ public class LocalTimeSchema extends AbstractSchema<LocalTime> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("LocalTime")
- .setType(SchemaType.LOCAL_TIME)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("LocalTime")
+ .type(SchemaType.LOCAL_TIME)
+ .schema(new byte[0]).build();
INSTANCE = new LocalTimeSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
index deccaf4ded8..e136fb39f2a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
@@ -32,10 +32,10 @@ public class LongSchema extends AbstractSchema<Long> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("INT64")
- .setType(SchemaType.INT64)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("INT64")
+ .type(SchemaType.INT64)
+ .schema(new byte[0]).build();
INSTANCE = new LongSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
index bbb5ad67529..afa0f9a3f12 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
@@ -32,10 +32,10 @@ public class ShortSchema extends AbstractSchema<Short> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("INT16")
- .setType(SchemaType.INT16)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("INT16")
+ .type(SchemaType.INT16)
+ .schema(new byte[0]).build();
INSTANCE = new ShortSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
index 370946aba37..2096f83bcb7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
@@ -43,10 +43,10 @@ public class StringSchema extends AbstractSchema<String> {
// Ensure the ordering of the static initialization
CHARSET_KEY = "__charset";
DEFAULT_CHARSET = StandardCharsets.UTF_8;
- DEFAULT_SCHEMA_INFO = new SchemaInfoImpl()
- .setName("String")
- .setType(SchemaType.STRING)
- .setSchema(new byte[0]);
+ DEFAULT_SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("String")
+ .type(SchemaType.STRING)
+ .schema(new byte[0]).build();
UTF8 = new StringSchema(StandardCharsets.UTF_8);
}
@@ -84,11 +84,12 @@ public class StringSchema extends AbstractSchema<String> {
this.charset = charset;
Map<String, String> properties = new HashMap<>();
properties.put(CHARSET_KEY, charset.name());
- this.schemaInfo = new SchemaInfoImpl()
- .setName(DEFAULT_SCHEMA_INFO.getName())
- .setType(SchemaType.STRING)
- .setSchema(DEFAULT_SCHEMA_INFO.getSchema())
- .setProperties(properties);
+ this.schemaInfo = SchemaInfoImpl.builder()
+ .name(DEFAULT_SCHEMA_INFO.getName())
+ .type(SchemaType.STRING)
+ .schema(DEFAULT_SCHEMA_INFO.getSchema())
+ .properties(properties)
+ .build();
}
public byte[] encode(String message) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
index 2be6e9d3a1e..43fc0693f17 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
@@ -32,10 +32,10 @@ public class TimeSchema extends AbstractSchema<Time> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("Time")
- .setType(SchemaType.TIME)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("Time")
+ .type(SchemaType.TIME)
+ .schema(new byte[0]).build();
INSTANCE = new TimeSchema();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
index 0b42d47146e..c89dfefc98b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
@@ -32,10 +32,10 @@ public class TimestampSchema extends AbstractSchema<Timestamp> {
private static final SchemaInfo SCHEMA_INFO;
static {
- SCHEMA_INFO = new SchemaInfoImpl()
- .setName("Timestamp")
- .setType(SchemaType.TIMESTAMP)
- .setSchema(new byte[0]);
+ SCHEMA_INFO = SchemaInfoImpl.builder()
+ .name("Timestamp")
+ .type(SchemaType.TIMESTAMP)
+ .schema(new byte[0]).build();
INSTANCE = new TimestampSchema();
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
index 188c1e72e54..169156917f4 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
@@ -170,11 +170,11 @@ public class KeyValueSchemaInfoTest {
KeyValueEncodingType.SEPARATED
);
- SchemaInfo oldSchemaInfo = new SchemaInfoImpl()
- .setName("")
- .setType(SchemaType.KEY_VALUE)
- .setSchema(kvSchema.getSchemaInfo().getSchema())
- .setProperties(Collections.emptyMap());
+ SchemaInfo oldSchemaInfo = SchemaInfoImpl.builder()
+ .name("")
+ .type(SchemaType.KEY_VALUE)
+ .schema(kvSchema.getSchemaInfo().getSchema())
+ .properties(Collections.emptyMap()).build();
assertEquals(
DefaultImplementation.getDefaultImplementation().decodeKeyValueEncodingType(oldSchemaInfo),
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
index 8a9897445f2..4b220857614 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
@@ -86,11 +86,11 @@ public class StringSchemaTest {
@Test
public void testSchemaInfoWithoutCharset() {
- SchemaInfo si = new SchemaInfoImpl()
- .setName("test-schema-info-without-charset")
- .setType(SchemaType.STRING)
- .setSchema(new byte[0])
- .setProperties(Collections.emptyMap());
+ SchemaInfo si = SchemaInfoImpl.builder()
+ .name("test-schema-info-without-charset")
+ .type(SchemaType.STRING)
+ .schema(new byte[0])
+ .properties(Collections.emptyMap()).build();
StringSchema schema = StringSchema.fromSchemaInfo(si);
String myString = "my string for test";
@@ -121,11 +121,11 @@ public class StringSchemaTest {
public void testSchemaInfoWithCharset(Charset charset) {
Map<String, String> properties = new HashMap<>();
properties.put(StringSchema.CHARSET_KEY, charset.name());
- SchemaInfo si = new SchemaInfoImpl()
- .setName("test-schema-info-without-charset")
- .setType(SchemaType.STRING)
- .setSchema(new byte[0])
- .setProperties(properties);
+ SchemaInfo si = SchemaInfoImpl.builder()
+ .name("test-schema-info-without-charset")
+ .type(SchemaType.STRING)
+ .schema(new byte[0])
+ .properties(properties).build();
StringSchema schema = StringSchema.fromSchemaInfo(si);
String myString = "my string for test";
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java
index d67dc5f29e1..b06ee517ac7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java
@@ -19,10 +19,10 @@
package org.apache.pulsar.client.impl.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;
-import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -30,6 +30,7 @@ import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
+import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -40,10 +41,8 @@ import org.apache.pulsar.common.schema.SchemaType;
@InterfaceAudience.Public
@InterfaceStability.Stable
@Data
-@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
-@Builder
public class SchemaInfoImpl implements SchemaInfo {
@EqualsAndHashCode.Exclude
@@ -67,9 +66,23 @@ public class SchemaInfoImpl implements SchemaInfo {
/**
* Additional properties of the schema definition (implementation defined).
*/
- @Builder.Default
private Map<String, String> properties = Collections.emptyMap();
+ @EqualsAndHashCode.Exclude
+ @JsonIgnore
+ private transient SchemaHash schemaHash;
+
+ @Builder
+ public SchemaInfoImpl(String name, byte[] schema, SchemaType type, long timestamp,
+ Map<String, String> properties) {
+ this.name = name;
+ this.schema = schema;
+ this.type = type;
+ this.timestamp = timestamp;
+ this.properties = properties == null ? Collections.emptyMap() : properties;
+ this.schemaHash = SchemaHash.of(this.schema, this.type);
+ }
+
public String getSchemaDefinition() {
if (null == schema) {
return "";
@@ -89,6 +102,19 @@ public class SchemaInfoImpl implements SchemaInfo {
}
}
+ /**
+ * Calculate the SchemaHash for compatible with `@NoArgsConstructor`.
+ * If SchemaInfoImpl is created by no-args-constructor from users, the schemaHash will be null.
+ * Note: We should remove this method as long as `@NoArgsConstructor` removed at major release to avoid null-check
+ * overhead.
+ */
+ public SchemaHash getSchemaHash() {
+ if (schemaHash == null) {
+ schemaHash = SchemaHash.of(this.schema, this.type);
+ }
+ return schemaHash;
+ }
+
@Override
public String toString() {
return SchemaUtils.jsonifySchemaInfo(this);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
index 8bbc18fbb70..46f92b1ec9a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaHash.java
@@ -21,9 +21,9 @@ package org.apache.pulsar.common.protocol.schema;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
-import java.util.Optional;
import lombok.EqualsAndHashCode;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@@ -33,7 +33,8 @@ import org.apache.pulsar.common.schema.SchemaType;
@EqualsAndHashCode
public class SchemaHash {
- private static HashFunction hashFunction = Hashing.sha256();
+ private static final HashFunction hashFunction = Hashing.sha256();
+ private static final SchemaHash EMPTY_SCHEMA_HASH = new SchemaHash(hashFunction.hashBytes(new byte[0]), null);
private final HashCode hash;
@@ -45,9 +46,10 @@ public class SchemaHash {
}
public static SchemaHash of(Schema schema) {
- Optional<SchemaInfo> schemaInfo = Optional.ofNullable(schema).map(Schema::getSchemaInfo);
- return of(schemaInfo.map(SchemaInfo::getSchema).orElseGet(() -> new byte[0]),
- schemaInfo.map(SchemaInfo::getType).orElse(null));
+ if (schema == null || schema.getSchemaInfo() == null) {
+ return EMPTY_SCHEMA_HASH;
+ }
+ return ((SchemaInfoImpl) schema.getSchemaInfo()).getSchemaHash();
}
public static SchemaHash of(SchemaData schemaData) {
@@ -55,12 +57,19 @@ public class SchemaHash {
}
public static SchemaHash of(SchemaInfo schemaInfo) {
- return of(schemaInfo == null ? new byte[0] : schemaInfo.getSchema(),
- schemaInfo == null ? null : schemaInfo.getType());
+ if (schemaInfo == null) {
+ return EMPTY_SCHEMA_HASH;
+ }
+ return ((SchemaInfoImpl) schemaInfo).getSchemaHash();
}
+ public static SchemaHash empty() {
+ return EMPTY_SCHEMA_HASH;
+ }
+
+ // Shouldn't call this method frequently, otherwise will bring performance regression
public static SchemaHash of(byte[] schemaBytes, SchemaType schemaType) {
- return new SchemaHash(hashFunction.hashBytes(schemaBytes), schemaType);
+ return new SchemaHash(hashFunction.hashBytes(schemaBytes == null ? new byte[0] : schemaBytes), schemaType);
}
public byte[] asBytes() {
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 0c260b4eb65..d3e1a40550f 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -692,10 +692,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
@Test
public void kafkaLogicalTypesTimestampTest() {
- Schema schema = new TestSchema(new SchemaInfoImpl()
- .setName(Timestamp.LOGICAL_NAME)
- .setType(SchemaType.INT64)
- .setSchema(new byte[0]));
+ Schema schema = new TestSchema(SchemaInfoImpl.builder()
+ .name(Timestamp.LOGICAL_NAME)
+ .type(SchemaType.INT64)
+ .schema(new byte[0])
+ .build());
org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);
@@ -709,10 +710,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
@Test
public void kafkaLogicalTypesTimeTest() {
- Schema schema = new TestSchema(new SchemaInfoImpl()
- .setName(Time.LOGICAL_NAME)
- .setType(SchemaType.INT32)
- .setSchema(new byte[0]));
+ Schema schema = new TestSchema(SchemaInfoImpl.builder()
+ .name(Time.LOGICAL_NAME)
+ .type(SchemaType.INT32)
+ .schema(new byte[0])
+ .build());
org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);
@@ -726,10 +728,11 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
@Test
public void kafkaLogicalTypesDateTest() {
- Schema schema = new TestSchema(new SchemaInfoImpl()
- .setName(Date.LOGICAL_NAME)
- .setType(SchemaType.INT32)
- .setSchema(new byte[0]));
+ Schema schema = new TestSchema(SchemaInfoImpl.builder()
+ .name(Date.LOGICAL_NAME)
+ .type(SchemaType.INT32)
+ .schema(new byte[0])
+ .build());
org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);
@@ -745,11 +748,12 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
public void kafkaLogicalTypesDecimalTest() {
Map<String, String> props = new HashMap<>();
props.put("scale", "10");
- Schema schema = new TestSchema(new SchemaInfoImpl()
- .setName(Decimal.LOGICAL_NAME)
- .setType(SchemaType.BYTES)
- .setProperties(props)
- .setSchema(new byte[0]));
+ Schema schema = new TestSchema(SchemaInfoImpl.builder()
+ .name(Decimal.LOGICAL_NAME)
+ .type(SchemaType.BYTES)
+ .properties(props)
+ .schema(new byte[0])
+ .build());
org.apache.kafka.connect.data.Schema kafkaSchema = PulsarSchemaToKafkaSchema
.getKafkaConnectSchema(schema);