You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/12 07:31:28 UTC

[pulsar] branch branch-2.8 updated (3505b98 -> 89ac98e)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 3505b98  Kafka connect sink adaptor to support non-primitive schemas (#10410)
     new 1916324  Upgrade Jetty to 9.4.42.v20210604 (#10907)
     new 0929015  [Security] Upgrade Zookeeper to 3.6.3 (#10852)
     new 89ac98e  Remove the unwanted dependencies in the pulsar function's instance jar and make SchemaInfo an interface (#10878)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 distribution/server/pom.xml                        |  6 ++
 distribution/server/src/assemble/LICENSE.bin.txt   | 44 +++++-----
 pom.xml                                            | 13 ++-
 .../apache/pulsar/broker/service/ServerCnx.java    |  2 +-
 .../admin/AdminApiSchemaValidationEnforced.java    | 34 ++++----
 .../schema/JsonSchemaCompatibilityCheckTest.java   | 12 +--
 .../java/org/apache/pulsar/schema/SchemaTest.java  |  5 +-
 .../SchemaCompatibilityCheckTest.java              |  3 +-
 .../pulsar/client/admin/internal/SchemasImpl.java  | 15 ++--
 .../apache/pulsar/common/schema/SchemaInfo.java    | 52 ++----------
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  2 +-
 .../pulsar/client/impl/HttpLookupService.java      |  2 +-
 .../client/impl/schema/AutoProduceBytesSchema.java |  7 +-
 .../pulsar/client/impl/schema/BooleanSchema.java   |  2 +-
 .../pulsar/client/impl/schema/ByteBufSchema.java   |  2 +-
 .../client/impl/schema/ByteBufferSchema.java       |  2 +-
 .../pulsar/client/impl/schema/ByteSchema.java      |  2 +-
 .../pulsar/client/impl/schema/BytesSchema.java     |  2 +-
 .../pulsar/client/impl/schema/DateSchema.java      |  2 +-
 .../pulsar/client/impl/schema/DoubleSchema.java    |  2 +-
 .../pulsar/client/impl/schema/FloatSchema.java     |  2 +-
 .../pulsar/client/impl/schema/InstantSchema.java   |  2 +-
 .../pulsar/client/impl/schema/IntSchema.java       |  2 +-
 .../pulsar/client/impl/schema/JSONSchema.java      | 10 +--
 .../pulsar/client/impl/schema/LocalDateSchema.java |  2 +-
 .../client/impl/schema/LocalDateTimeSchema.java    |  2 +-
 .../pulsar/client/impl/schema/LocalTimeSchema.java |  2 +-
 .../pulsar/client/impl/schema/LongSchema.java      |  2 +-
 .../client/impl/schema/ProtobufNativeSchema.java   |  7 +-
 .../pulsar/client/impl/schema/ProtobufSchema.java  |  7 +-
 .../impl/schema/RecordSchemaBuilderImpl.java       |  2 +-
 .../pulsar/client/impl}/schema/SchemaInfoUtil.java | 48 ++++++-----
 .../pulsar/client/impl/schema/ShortSchema.java     |  2 +-
 .../pulsar/client/impl/schema/StringSchema.java    |  4 +-
 .../pulsar/client/impl/schema/StructSchema.java    |  2 +-
 .../pulsar/client/impl/schema/TimeSchema.java      |  2 +-
 .../pulsar/client/impl/schema/TimestampSchema.java |  2 +-
 .../pulsar/client/impl/schema/util/SchemaUtil.java |  3 +-
 .../src/main/resources/findbugsExclude.xml         | 16 ++++
 .../client/impl/schema/KeyValueSchemaInfoTest.java |  2 +-
 .../client/impl/schema/KeyValueSchemaTest.java     | 31 ++++---
 .../pulsar/client/impl/schema/SchemaInfoTest.java  | 16 ++--
 .../client/impl/schema/StringSchemaTest.java       |  4 +-
 pulsar-common/pom.xml                              |  4 +
 .../admin/internal/data/AuthPoliciesImpl.java      | 20 +++--
 .../client/impl/schema/KeyValueSchemaInfo.java     | 15 ++--
 .../pulsar/client/impl/schema/SchemaInfoImpl.java  | 18 ++---
 .../pulsar/client/impl/schema/SchemaUtils.java     |  0
 .../pulsar/common/protocol/schema/SchemaData.java  |  3 +-
 pulsar-functions/runtime-all/pom.xml               | 30 ++++++-
 .../functions/instance/JavaInstanceMain.java       |  1 +
 .../functions/instance/JavaInstanceDepsTest.java   | 77 ++++++++++++++++++
 .../functions/runtime/thread/ThreadRuntime.java    |  1 -
 .../connect/schema/KafkaSchemaWrappedSchema.java   |  3 +-
 .../apache/pulsar/io/kafka/AvroSchemaCache.java    |  3 +-
 .../apache/pulsar/io/kafka/KafkaBytesSource.java   |  3 +-
 pulsar-sql/presto-distribution/LICENSE             | 37 +++++----
 .../org/apache/pulsar/sql/presto/PulsarSplit.java  |  3 +-
 .../pulsar/sql/presto/TestPulsarMetadata.java      | 15 ++--
 .../decoder/primitive/TestPrimitiveDecoder.java    | 25 +++---
 tests/docker-images/java-test-functions/pom.xml    | 18 +----
 .../io/sources/GenericRecordSourceTest.java        | 94 ++++++++++++++--------
 62 files changed, 448 insertions(+), 305 deletions(-)
 rename {pulsar-common/src/main/java/org/apache/pulsar/common/protocol => pulsar-client/src/main/java/org/apache/pulsar/client/impl}/schema/SchemaInfoUtil.java (58%)
 rename {pulsar-client => pulsar-common}/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java (96%)
 copy pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java => pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java (83%)
 rename {pulsar-client => pulsar-common}/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java (100%)
 create mode 100644 pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java

[pulsar] 03/03: Remove the unwanted dependencies in the pulsar function's instance jar and make SchemaInfo an interface (#10878)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 89ac98e4af363b09f2fe8e309539b0e35243aaee
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sat Jun 12 00:25:37 2021 -0700

    Remove the unwanted dependencies in the pulsar function's instance jar and make SchemaInfo an interface (#10878)
    
    ### Motivation
    
    The java-instance.jar generated by the pulsar-functions-runtime-all module should only contain interfaces that Pulsar Function's framework uses to interact with user code.  The module should on have the following dependencies
        1. pulsar-io-core
        2. pulsar-functions-api
        3. pulsar-client-api
        4. slf4j-api
        5. log4j-slf4j-impl
        6. log4j-api
        7. log4j-core
    
    *Explain here the context, and why you're making that change. What is the problem you're trying to solve.*
    
    ### Modifications
    
    Change dep pulsar-client-original to pulsar-client-api
    
    Slight changes in the top level pom for what is included in all sub-modules so that additional deps don't land into java-instance.jar
    
    There is also a fix for an issue introduced by https://github.com/apache/pulsar/pull/9673. The thread context class loader was set incorrectly in ThreadRuntime.
    
    ### Future improvements
    
    1. We should also add a test in the future to make sure external libraries don't get add accidentally this module and java-instance.jar
    
    2. Rename the module "pulsar-functions-runtime-all" to something that describes its function better.  The current name can be confusing
    
    
    (cherry picked from commit d81b5f8b8e6cb17f307ec830accaf9dd95d7643b)
---
 distribution/server/pom.xml                        |  6 ++
 pom.xml                                            |  9 +--
 .../apache/pulsar/broker/service/ServerCnx.java    |  2 +-
 .../admin/AdminApiSchemaValidationEnforced.java    | 34 ++++----
 .../schema/JsonSchemaCompatibilityCheckTest.java   | 12 +--
 .../java/org/apache/pulsar/schema/SchemaTest.java  |  5 +-
 .../SchemaCompatibilityCheckTest.java              |  3 +-
 .../pulsar/client/admin/internal/SchemasImpl.java  | 15 ++--
 .../apache/pulsar/common/schema/SchemaInfo.java    | 52 ++----------
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  2 +-
 .../pulsar/client/impl/HttpLookupService.java      |  2 +-
 .../client/impl/schema/AutoProduceBytesSchema.java |  7 +-
 .../pulsar/client/impl/schema/BooleanSchema.java   |  2 +-
 .../pulsar/client/impl/schema/ByteBufSchema.java   |  2 +-
 .../client/impl/schema/ByteBufferSchema.java       |  2 +-
 .../pulsar/client/impl/schema/ByteSchema.java      |  2 +-
 .../pulsar/client/impl/schema/BytesSchema.java     |  2 +-
 .../pulsar/client/impl/schema/DateSchema.java      |  2 +-
 .../pulsar/client/impl/schema/DoubleSchema.java    |  2 +-
 .../pulsar/client/impl/schema/FloatSchema.java     |  2 +-
 .../pulsar/client/impl/schema/InstantSchema.java   |  2 +-
 .../pulsar/client/impl/schema/IntSchema.java       |  2 +-
 .../pulsar/client/impl/schema/JSONSchema.java      | 10 +--
 .../pulsar/client/impl/schema/LocalDateSchema.java |  2 +-
 .../client/impl/schema/LocalDateTimeSchema.java    |  2 +-
 .../pulsar/client/impl/schema/LocalTimeSchema.java |  2 +-
 .../pulsar/client/impl/schema/LongSchema.java      |  2 +-
 .../client/impl/schema/ProtobufNativeSchema.java   |  7 +-
 .../pulsar/client/impl/schema/ProtobufSchema.java  |  7 +-
 .../impl/schema/RecordSchemaBuilderImpl.java       |  2 +-
 .../pulsar/client/impl}/schema/SchemaInfoUtil.java | 48 ++++++-----
 .../pulsar/client/impl/schema/ShortSchema.java     |  2 +-
 .../pulsar/client/impl/schema/StringSchema.java    |  4 +-
 .../pulsar/client/impl/schema/StructSchema.java    |  2 +-
 .../pulsar/client/impl/schema/TimeSchema.java      |  2 +-
 .../pulsar/client/impl/schema/TimestampSchema.java |  2 +-
 .../pulsar/client/impl/schema/util/SchemaUtil.java |  3 +-
 .../src/main/resources/findbugsExclude.xml         | 16 ++++
 .../client/impl/schema/KeyValueSchemaInfoTest.java |  2 +-
 .../client/impl/schema/KeyValueSchemaTest.java     | 31 ++++---
 .../pulsar/client/impl/schema/SchemaInfoTest.java  | 16 ++--
 .../client/impl/schema/StringSchemaTest.java       |  4 +-
 pulsar-common/pom.xml                              |  4 +
 .../admin/internal/data/AuthPoliciesImpl.java      | 20 +++--
 .../client/impl/schema/KeyValueSchemaInfo.java     | 15 ++--
 .../pulsar/client/impl/schema/SchemaInfoImpl.java  | 18 ++---
 .../pulsar/client/impl/schema/SchemaUtils.java     |  0
 .../pulsar/common/protocol/schema/SchemaData.java  |  3 +-
 pulsar-functions/runtime-all/pom.xml               | 30 ++++++-
 .../functions/instance/JavaInstanceMain.java       |  1 +
 .../functions/instance/JavaInstanceDepsTest.java   | 77 ++++++++++++++++++
 .../functions/runtime/thread/ThreadRuntime.java    |  1 -
 .../connect/schema/KafkaSchemaWrappedSchema.java   |  3 +-
 .../apache/pulsar/io/kafka/AvroSchemaCache.java    |  3 +-
 .../apache/pulsar/io/kafka/KafkaBytesSource.java   |  3 +-
 pulsar-sql/presto-distribution/LICENSE             |  1 -
 .../org/apache/pulsar/sql/presto/PulsarSplit.java  |  3 +-
 .../pulsar/sql/presto/TestPulsarMetadata.java      | 15 ++--
 .../decoder/primitive/TestPrimitiveDecoder.java    | 25 +++---
 tests/docker-images/java-test-functions/pom.xml    | 18 +----
 .../io/sources/GenericRecordSourceTest.java        | 94 ++++++++++++++--------
 61 files changed, 406 insertions(+), 263 deletions(-)

diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml
index 57ae871..27d12c4 100644
--- a/distribution/server/pom.xml
+++ b/distribution/server/pom.xml
@@ -65,6 +65,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper-prometheus-metrics</artifactId>
+      <version>${zookeeper.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-package-bookkeeper-storage</artifactId>
       <version>${project.version}</version>
diff --git a/pom.xml b/pom.xml
index f4ce36c..c98a55a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1199,13 +1199,12 @@ flexible messaging model and an intuitive client API.</description>
           <groupId>com.fasterxml.jackson.core</groupId>
           <artifactId>*</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
-    <dependency>
-      <groupId>org.apache.zookeeper</groupId>
-      <artifactId>zookeeper-prometheus-metrics</artifactId>
-      <version>${zookeeper.version}</version>
-    </dependency>
   </dependencies>
 
   <build>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 2f3d32d..4445f64 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -83,6 +83,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandAck;
@@ -135,7 +136,6 @@ import org.apache.pulsar.common.protocol.CommandUtils;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
-import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
index 1ebf7f1..3daf920 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -97,11 +98,12 @@ public class AdminApiSchemaValidationEnforced extends MockedPulsarServiceBaseTes
             assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
         }
         Map<String, String> properties = Maps.newHashMap();
-        SchemaInfo schemaInfo = new SchemaInfo();
-        schemaInfo.setType(SchemaType.STRING);
-        schemaInfo.setProperties(properties);
-        schemaInfo.setName("test");
-        schemaInfo.setSchema("".getBytes());
+        SchemaInfo schemaInfo = SchemaInfoImpl.builder()
+                .type(SchemaType.STRING)
+                .properties(properties)
+                .name("test")
+                .schema("".getBytes())
+                .build();
         PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
         admin.schemas().createSchema(topicName, postSchemaPayload);
         try (Producer p = pulsarClient.newProducer().topic(topicName).create()) {
@@ -145,11 +147,12 @@ public class AdminApiSchemaValidationEnforced extends MockedPulsarServiceBaseTes
         }
         Map<String, String> properties = Maps.newHashMap();
         properties.put("key1", "value1");
-        SchemaInfo schemaInfo = new SchemaInfo();
-        schemaInfo.setType(SchemaType.STRING);
-        schemaInfo.setProperties(properties);
-        schemaInfo.setName("test");
-        schemaInfo.setSchema("".getBytes());
+        SchemaInfo schemaInfo = SchemaInfoImpl.builder()
+                .type(SchemaType.STRING)
+                .properties(properties)
+                .name("test")
+                .schema("".getBytes())
+                .build();
         PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
         admin.schemas().createSchema(topicName, postSchemaPayload);
         try (Producer p = pulsarClient.newProducer().topic(topicName).create()) {
@@ -174,11 +177,12 @@ public class AdminApiSchemaValidationEnforced extends MockedPulsarServiceBaseTes
         }
         admin.namespaces().setSchemaValidationEnforced(namespace,true);
         Map<String, String> properties = Maps.newHashMap();
-        SchemaInfo schemaInfo = new SchemaInfo();
-        schemaInfo.setType(SchemaType.STRING);
-        schemaInfo.setProperties(properties);
-        schemaInfo.setName("test");
-        schemaInfo.setSchema("".getBytes());
+        SchemaInfo schemaInfo = SchemaInfoImpl.builder()
+                .type(SchemaType.STRING)
+                .properties(properties)
+                .name("test")
+                .schema("".getBytes())
+                .build();
         PostSchemaPayload postSchemaPayload = new PostSchemaPayload("STRING", "", properties);
         admin.schemas().createSchema(topicName, postSchemaPayload);
         try (Producer<String> p = pulsarClient.newProducer(Schema.STRING).topic(topicName).create()) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
index 04914fe..32a9f9e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
@@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -118,11 +119,12 @@ public class JsonSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilit
             JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(mapper);
             JsonSchema schema = schemaGen.generateSchema(pojo);
 
-            SchemaInfo info = new SchemaInfo();
-            info.setName("");
-            info.setProperties(properties);
-            info.setType(SchemaType.JSON);
-            info.setSchema(mapper.writeValueAsBytes(schema));
+            SchemaInfo info = SchemaInfoImpl.builder()
+                    .name("")
+                    .properties(properties)
+                    .type(SchemaType.JSON)
+                    .schema(mapper.writeValueAsBytes(schema))
+                    .build();
             return new OldJSONSchema<>(info, pojo, mapper);
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index d97b989..6e860ad 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
@@ -437,7 +438,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         admin.topics().createPartitionedTopic(topic, 2);
 
         // set schema
-        SchemaInfo schemaInfo = SchemaInfo
+        SchemaInfo schemaInfo = SchemaInfoImpl
                 .builder()
                 .schema(new byte[0])
                 .name("dummySchema")
@@ -653,7 +654,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         final Map<String, String> map = new HashMap<>();
         map.put("key", null);
         map.put(null, "value"); // null key is not allowed for JSON, it's only for test here
-        Schema.INT32.getSchemaInfo().setProperties(map);
+        ((SchemaInfoImpl)Schema.INT32.getSchemaInfo()).setProperties(map);
 
         final Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32).topic(topic)
                 .subscriptionName("sub")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index d6d96f7..61d8332 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -35,6 +35,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
@@ -323,7 +324,7 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
                 SchemaCompatibilityStrategy.FULL);
         byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class)
                 .getSchemaInfo().getSchema(), UTF_8) + "/n   /n   /n").getBytes();
-        SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
+        SchemaInfo schemaInfo = SchemaInfoImpl.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
         admin.schemas().createSchema(fqtn, schemaInfo);
 
         admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index 9a9a4ed..4408ae2 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -31,6 +31,7 @@ import javax.ws.rs.client.WebTarget;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Schemas;
 import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
@@ -434,7 +435,7 @@ public class SchemasImpl extends BaseResource implements Schemas {
     // the util function converts `GetSchemaResponse` to `SchemaInfo`
     static SchemaInfo convertGetSchemaResponseToSchemaInfo(TopicName tn,
                                                            GetSchemaResponse response) {
-        SchemaInfo info = new SchemaInfo();
+
         byte[] schema;
         if (response.getType() == SchemaType.KEY_VALUE) {
             schema = DefaultImplementation.convertKeyValueDataStringToSchemaInfoSchema(
@@ -442,11 +443,13 @@ public class SchemasImpl extends BaseResource implements Schemas {
         } else {
             schema = response.getData().getBytes(UTF_8);
         }
-        info.setSchema(schema);
-        info.setType(response.getType());
-        info.setProperties(response.getProperties());
-        info.setName(tn.getLocalName());
-        return info;
+
+        return SchemaInfoImpl.builder()
+                .schema(schema)
+                .type(response.getType())
+                .properties(response.getProperties())
+                .name(tn.getLocalName())
+                .build();
     }
 
     static SchemaInfoWithVersion convertGetSchemaResponseToSchemaInfoWithVersion(TopicName tn,
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index f2c5860..0070c4c 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -18,16 +18,7 @@
  */
 package org.apache.pulsar.common.schema;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-import java.util.Base64;
-import java.util.Collections;
 import java.util.Map;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-import lombok.experimental.Accessors;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
@@ -37,55 +28,24 @@ import org.apache.pulsar.common.classification.InterfaceStability;
  */
 @InterfaceAudience.Public
 @InterfaceStability.Stable
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-@Accessors(chain = true)
-@Builder
-public class SchemaInfo {
+public interface SchemaInfo {
 
-    @EqualsAndHashCode.Exclude
-    private String name;
+    String getName();
 
     /**
      * The schema data in AVRO JSON format.
      */
-    private byte[] schema;
+    byte[] getSchema();
 
     /**
      * The type of schema (AVRO, JSON, PROTOBUF, etc..).
      */
-    private SchemaType type;
+    SchemaType getType();
 
     /**
      * Additional properties of the schema definition (implementation defined).
      */
-    @Builder.Default
-    private Map<String, String> properties = Collections.emptyMap();
-
-    public String getSchemaDefinition() {
-        if (null == schema) {
-            return "";
-        }
-
-        switch (type) {
-            case AVRO:
-            case JSON:
-            case PROTOBUF:
-            case PROTOBUF_NATIVE:
-                return new String(schema, UTF_8);
-            case KEY_VALUE:
-                KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
-                    DefaultImplementation.decodeKeyValueSchemaInfo(this);
-                return DefaultImplementation.jsonifyKeyValueSchemaInfo(schemaInfoKeyValue);
-            default:
-                return Base64.getEncoder().encodeToString(schema);
-        }
-    }
-
-    @Override
-    public String toString(){
-        return DefaultImplementation.jsonifySchemaInfo(this);
-    }
+    Map<String, String> getProperties();
 
+    String getSchemaDefinition();
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index a7cc68e..bed2b9c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -90,7 +90,7 @@ import org.apache.pulsar.common.api.proto.CommandSuccess;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
-import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
+import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.util.FutureUtil;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 9108a6e..f2cc169 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -47,7 +47,7 @@ import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaInfo;
-import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
+import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index 3db9554..8971aab 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl.schema;
 import static com.google.common.base.Preconditions.checkState;
 
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -74,9 +75,9 @@ public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
 
         if (requireSchemaValidation) {
             // verify if the message can be decoded by the underlying schema
-            if (schema instanceof KeyValueSchemaImpl
-                    && ((KeyValueSchemaImpl) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
-                ((KeyValueSchemaImpl) schema).getValueSchema().validate(message);
+            if (schema instanceof KeyValueSchema
+                    && ((KeyValueSchema) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
+                ((KeyValueSchema) schema).getValueSchema().validate(message);
             } else {
                 schema.validate(message);
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
index c66ff43..3b5296e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BooleanSchema.java
@@ -32,7 +32,7 @@ public class BooleanSchema extends AbstractSchema<Boolean> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
                 .setName("Boolean")
                 .setType(SchemaType.BOOLEAN)
                 .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
index 658e398..ce68298 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufSchema.java
@@ -33,7 +33,7 @@ public class ByteBufSchema extends AbstractSchema<ByteBuf> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("ByteBuf")
             .setType(SchemaType.BYTES)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
index c560f0e..0ff308f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteBufferSchema.java
@@ -34,7 +34,7 @@ public class ByteBufferSchema extends AbstractSchema<ByteBuffer> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("ByteBuffer")
             .setType(SchemaType.BYTES)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
index 4e4c27e..6d51687 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ByteSchema.java
@@ -32,7 +32,7 @@ public class ByteSchema extends AbstractSchema<Byte> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("INT8")
             .setType(SchemaType.INT8)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
index 9c7ec37..98a0e66 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/BytesSchema.java
@@ -31,7 +31,7 @@ public class BytesSchema extends AbstractSchema<byte[]> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("Bytes")
             .setType(SchemaType.BYTES)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
index 295dae6..cbdb912 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DateSchema.java
@@ -33,7 +33,7 @@ public class DateSchema extends AbstractSchema<Date> {
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfo()
+       SCHEMA_INFO = new SchemaInfoImpl()
              .setName("Date")
              .setType(SchemaType.DATE)
              .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
index baa1aac..4b269a6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/DoubleSchema.java
@@ -32,7 +32,7 @@ public class DoubleSchema extends AbstractSchema<Double> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("Double")
             .setType(SchemaType.DOUBLE)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
index aed905b..84d4073 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/FloatSchema.java
@@ -32,7 +32,7 @@ public class FloatSchema extends AbstractSchema<Float> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
                 .setName("Float")
                 .setType(SchemaType.FLOAT)
                 .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
index 5830cea..db33de7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/InstantSchema.java
@@ -33,7 +33,7 @@ public class InstantSchema extends AbstractSchema<Instant> {
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfo()
+       SCHEMA_INFO = new SchemaInfoImpl()
              .setName("Instant")
              .setType(SchemaType.INSTANT)
              .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
index fc8338e..dfad280 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/IntSchema.java
@@ -32,7 +32,7 @@ public class IntSchema extends AbstractSchema<Integer> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("INT32")
             .setType(SchemaType.INT32)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index 4e3b874..9fe6aed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -74,11 +74,11 @@ public class JSONSchema<T> extends AvroBaseStructSchema<T> {
             ObjectMapper objectMapper = new ObjectMapper();
             JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(objectMapper);
             JsonSchema jsonBackwardsCompatibleSchema = schemaGen.generateSchema(pojo);
-            backwardsCompatibleSchemaInfo = new SchemaInfo();
-            backwardsCompatibleSchemaInfo.setName("");
-            backwardsCompatibleSchemaInfo.setProperties(schemaInfo.getProperties());
-            backwardsCompatibleSchemaInfo.setType(SchemaType.JSON);
-            backwardsCompatibleSchemaInfo.setSchema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema));
+            backwardsCompatibleSchemaInfo = new SchemaInfoImpl()
+                    .setName("")
+                    .setProperties(schemaInfo.getProperties())
+                    .setType(SchemaType.JSON)
+                    .setSchema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema));
         } catch (JsonProcessingException ex) {
             throw new RuntimeException(ex);
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
index add6fd2..18ef3af 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateSchema.java
@@ -32,7 +32,7 @@ public class LocalDateSchema extends AbstractSchema<LocalDate> {
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfo()
+       SCHEMA_INFO = new SchemaInfoImpl()
              .setName("LocalDate")
              .setType(SchemaType.LOCAL_DATE)
              .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
index aa86a19..05b2787 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalDateTimeSchema.java
@@ -37,7 +37,7 @@ public class LocalDateTimeSchema extends AbstractSchema<LocalDateTime> {
    public static final String DELIMITER = ":";
 
    static {
-       SCHEMA_INFO = new SchemaInfo()
+       SCHEMA_INFO = new SchemaInfoImpl()
              .setName("LocalDateTime")
              .setType(SchemaType.LOCAL_DATE_TIME)
              .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
index 6e2bf62..e53c620 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LocalTimeSchema.java
@@ -32,7 +32,7 @@ public class LocalTimeSchema extends AbstractSchema<LocalTime> {
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfo()
+       SCHEMA_INFO = new SchemaInfoImpl()
              .setName("LocalTime")
              .setType(SchemaType.LOCAL_TIME)
              .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
index f1491f4..deccaf4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/LongSchema.java
@@ -32,7 +32,7 @@ public class LongSchema extends AbstractSchema<Long> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("INT64")
             .setType(SchemaType.INT64)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
index 385fc41..9cf753c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchema.java
@@ -72,11 +72,10 @@ public class ProtobufNativeSchema<T extends GeneratedMessageV3> extends Abstract
         setReader(new ProtobufNativeReader<>(protoMessageInstance));
         setWriter(new ProtobufNativeWriter<>());
         // update properties with protobuf related properties
-        Map<String, String> allProperties = new HashMap<>();
-        allProperties.putAll(schemaInfo.getProperties());
         // set protobuf parsing info
+        Map<String, String> allProperties = new HashMap<>(schemaInfo.getProperties());
         allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance));
-        schemaInfo.setProperties(allProperties);
+        ((SchemaInfoImpl)schemaInfo).setProperties(allProperties);
     }
 
     private String getParsingInfo(T protoMessageInstance) {
@@ -124,7 +123,7 @@ public class ProtobufNativeSchema<T extends GeneratedMessageV3> extends Abstract
         }
         Descriptors.Descriptor descriptor = createProtobufNativeSchema(schemaDefinition.getPojo());
 
-        SchemaInfo schemaInfo = SchemaInfo.builder()
+        SchemaInfo schemaInfo = SchemaInfoImpl.builder()
                 .schema(ProtobufNativeSchemaUtils.serialize(descriptor))
                 .type(SchemaType.PROTOBUF_NATIVE)
                 .name("")
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
index f7971eb..275cacd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufSchema.java
@@ -69,11 +69,10 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex
         setReader(new ProtobufReader<>(protoMessageInstance));
         setWriter(new ProtobufWriter<>());
         // update properties with protobuf related properties
-        Map<String, String> allProperties = new HashMap<>();
-        allProperties.putAll(schemaInfo.getProperties());
         // set protobuf parsing info
+        Map<String, String> allProperties = new HashMap<>(schemaInfo.getProperties());
         allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance));
-        schemaInfo.setProperties(allProperties);
+        ((SchemaInfoImpl)schemaInfo).setProperties(allProperties);
     }
 
     private String getParsingInfo(T protoMessageInstance) {
@@ -111,7 +110,7 @@ public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> ex
                     + " is not assignable from " + pojo.getName());
         }
 
-            SchemaInfo schemaInfo = SchemaInfo.builder()
+            SchemaInfo schemaInfo = SchemaInfoImpl.builder()
                     .schema(createProtobufAvroSchema(schemaDefinition.getPojo()).toString().getBytes(UTF_8))
                     .type(SchemaType.PROTOBUF)
                     .name("")
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java
index ee9f0cb..0fda7d5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/RecordSchemaBuilderImpl.java
@@ -105,7 +105,7 @@ public class RecordSchemaBuilderImpl implements RecordSchemaBuilder {
         }
 
         baseSchema.setFields(avroFields);
-        return new SchemaInfo(
+        return new SchemaInfoImpl(
             name,
             baseSchema.toString().getBytes(UTF_8),
             schemaType,
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoUtil.java
similarity index 58%
rename from pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoUtil.java
index ac5997d..fb5263e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaInfoUtil.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoUtil.java
@@ -16,16 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.common.protocol.schema;
+package org.apache.pulsar.client.impl.schema;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
+import java.util.Map;
 import java.util.TreeMap;
 import lombok.experimental.UtilityClass;
 
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.api.proto.KeyValue;
 import org.apache.pulsar.common.api.proto.Schema;
 import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.schema.GetSchemaResponse;
+import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaInfo;
 
 /**
@@ -35,37 +39,39 @@ import org.apache.pulsar.common.schema.SchemaInfo;
 public class SchemaInfoUtil {
 
     public static SchemaInfo newSchemaInfo(String name, SchemaData data) {
-        SchemaInfo si = new SchemaInfo();
-        si.setName(name);
-        si.setSchema(data.getData());
-        si.setType(data.getType());
-        si.setProperties(data.getProps());
-        return si;
+        return SchemaInfoImpl.builder()
+                .name(name)
+                .schema(data.getData())
+                .type(data.getType())
+                .properties(data.getProps())
+                .build();
     }
 
     public static SchemaInfo newSchemaInfo(Schema schema) {
-        SchemaInfo si = new SchemaInfo();
-        si.setName(schema.getName());
-        si.setSchema(schema.getSchemaData());
-        si.setType(Commands.getSchemaType(schema.getType()));
+        SchemaInfoImpl.SchemaInfoImplBuilder si = SchemaInfoImpl.builder()
+                .name(schema.getName())
+                .schema(schema.getSchemaData())
+                .type(Commands.getSchemaType(schema.getType()));
         if (schema.getPropertiesCount() == 0) {
-            si.setProperties(Collections.emptyMap());
+            si.properties(Collections.emptyMap());
         } else {
-            si.setProperties(new TreeMap<>());
+            Map<String, String> properties = new TreeMap<>();
             for (int i = 0; i < schema.getPropertiesCount(); i++) {
                 KeyValue kv = schema.getPropertyAt(i);
-                si.getProperties().put(kv.getKey(), kv.getValue());
+                properties.put(kv.getKey(), kv.getValue());
             }
+
+            si.properties(properties);
         }
-        return si;
+        return si.build();
     }
 
     public static SchemaInfo newSchemaInfo(String name, GetSchemaResponse schema) {
-        SchemaInfo si = new SchemaInfo();
-        si.setName(name);
-        si.setSchema(schema.getData().getBytes(StandardCharsets.UTF_8));
-        si.setType(schema.getType());
-        si.setProperties(schema.getProperties());
-        return si;
+        return SchemaInfoImpl.builder()
+                .name(name)
+                .schema(schema.getData().getBytes(StandardCharsets.UTF_8))
+                .type(schema.getType())
+                .properties(schema.getProperties())
+                .build();
     }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
index 4014405..bbb5ad6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ShortSchema.java
@@ -32,7 +32,7 @@ public class ShortSchema extends AbstractSchema<Short> {
     private static final SchemaInfo SCHEMA_INFO;
 
     static {
-        SCHEMA_INFO = new SchemaInfo()
+        SCHEMA_INFO = new SchemaInfoImpl()
             .setName("INT16")
             .setType(SchemaType.INT16)
             .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
index 7e57f6c..462fa60 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StringSchema.java
@@ -46,7 +46,7 @@ public class StringSchema extends AbstractSchema<String> {
         // Ensure the ordering of the static initialization
         CHARSET_KEY = "__charset";
         DEFAULT_CHARSET = StandardCharsets.UTF_8;
-        DEFAULT_SCHEMA_INFO = new SchemaInfo()
+        DEFAULT_SCHEMA_INFO = new SchemaInfoImpl()
                 .setName("String")
                 .setType(SchemaType.STRING)
                 .setSchema(new byte[0]);
@@ -87,7 +87,7 @@ public class StringSchema extends AbstractSchema<String> {
         this.charset = charset;
         Map<String, String> properties = new HashMap<>();
         properties.put(CHARSET_KEY, charset.name());
-        this.schemaInfo = new SchemaInfo()
+        this.schemaInfo = new SchemaInfoImpl()
                 .setName(DEFAULT_SCHEMA_INFO.getName())
                 .setType(SchemaType.STRING)
                 .setSchema(DEFAULT_SCHEMA_INFO.getSchema())
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index 8cc1868..7ba116f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -105,7 +105,7 @@ public abstract class StructSchema<T> extends AbstractStructSchema<T> {
     }
 
     public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
-        return SchemaInfo.builder()
+        return SchemaInfoImpl.builder()
                 .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
                 .properties(schemaDefinition.getProperties())
                 .name("")
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
index d56e4da..ab6e1ad 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimeSchema.java
@@ -33,7 +33,7 @@ public class TimeSchema extends AbstractSchema<Time> {
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfo()
+       SCHEMA_INFO = new SchemaInfoImpl()
              .setName("Time")
              .setType(SchemaType.TIME)
              .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
index 899e159..755b466 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/TimestampSchema.java
@@ -33,7 +33,7 @@ public class TimestampSchema extends AbstractSchema<Timestamp> {
    private static final SchemaInfo SCHEMA_INFO;
 
    static {
-       SCHEMA_INFO = new SchemaInfo()
+       SCHEMA_INFO = new SchemaInfoImpl()
              .setName("Timestamp")
              .setType(SchemaType.TIMESTAMP)
              .setSchema(new byte[0]);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
index 7e1e1c0..70d7fc0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/util/SchemaUtil.java
@@ -23,6 +23,7 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -47,7 +48,7 @@ public class SchemaUtil {
     }
 
     public static <T> SchemaInfo parseSchemaInfo(SchemaDefinition<T> schemaDefinition, SchemaType schemaType) {
-        return SchemaInfo.builder()
+        return SchemaInfoImpl.builder()
                 .schema(createAvroSchema(schemaDefinition).toString().getBytes(UTF_8))
                 .properties(schemaDefinition.getProperties())
                 .name("")
diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml b/pulsar-client/src/main/resources/findbugsExclude.xml
index bf01926..a37c886 100644
--- a/pulsar-client/src/main/resources/findbugsExclude.xml
+++ b/pulsar-client/src/main/resources/findbugsExclude.xml
@@ -69,6 +69,22 @@
         <Class name="org.apache.pulsar.client.impl.schema.BooleanSchema"/>
         <Bug pattern="NP_BOOLEAN_RETURN_NULL"/>
     </Match>
+    
+    <Match>
+        <Class name="org.apache.pulsar.client.impl.schema.SchemaInfoImpl"/>
+        <Bug pattern="EI_EXPOSE_REP"/>
+    </Match>
+
+    <Match>
+        <Class name="org.apache.pulsar.client.impl.schema.SchemaInfoImpl"/>
+        <Bug pattern="EI_EXPOSE_REP2"/>
+    </Match>
+
+    <Match>
+      <Class name="org.apache.pulsar.client.impl.schema.SchemaInfoImpl$SchemaInfoImplBuilder"/>
+      <Bug pattern="EI_EXPOSE_REP2"/>
+    </Match>
+    
 
     <Match>
         <Class name="~org.apache.pulsar.client.impl.ConsumerImpl.*"/>
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
index 994f013..9474b80 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfoTest.java
@@ -171,7 +171,7 @@ public class KeyValueSchemaInfoTest {
             KeyValueEncodingType.SEPARATED
         );
 
-        SchemaInfo oldSchemaInfo = new SchemaInfo()
+        SchemaInfo oldSchemaInfo = new SchemaInfoImpl()
             .setName("")
             .setType(SchemaType.KEY_VALUE)
             .setSchema(kvSchema.getSchemaInfo().getSchema())
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index b0a86c2..bd94bf0 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Color;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -67,28 +68,32 @@ public class KeyValueSchemaTest {
 
     @Test
     public void testFillParametersToSchemainfo() {
-        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
-        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
-
-        fooSchema.getSchemaInfo().setName("foo");
-        fooSchema.getSchemaInfo().setType(SchemaType.AVRO);
         Map<String, String> keyProperties = Maps.newTreeMap();
         keyProperties.put("foo.key1", "value");
         keyProperties.put("foo.key2", "value");
-        fooSchema.getSchemaInfo().setProperties(keyProperties);
-        barSchema.getSchemaInfo().setName("bar");
-        barSchema.getSchemaInfo().setType(SchemaType.AVRO);
+
         Map<String, String> valueProperties = Maps.newTreeMap();
         valueProperties.put("bar.key", "key");
-        barSchema.getSchemaInfo().setProperties(valueProperties);
+
+        AvroSchema<Foo> fooSchema = AvroSchema.of(
+                SchemaDefinition.<Foo>builder()
+                        .withPojo(Foo.class)
+                        .withProperties(keyProperties)
+                        .build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(
+                SchemaDefinition.<Bar>builder()
+                        .withPojo(Bar.class)
+                        .withProperties(valueProperties)
+                        .build());
+
         Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
 
-        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.name"), "foo");
         assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.type"), String.valueOf(SchemaType.AVRO));
-        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.properties"), "{\"foo.key1\":\"value\",\"foo.key2\":\"value\"}");
-        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.name"), "bar");
+        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("key.schema.properties"),
+                "{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\",\"foo.key1\":\"value\",\"foo.key2\":\"value\"}");
         assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.type"), String.valueOf(SchemaType.AVRO));
-        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.properties"), "{\"bar.key\":\"key\"}");
+        assertEquals(keyValueSchema1.getSchemaInfo().getProperties().get("value.schema.properties"),
+                "{\"__alwaysAllowNull\":\"true\",\"__jsr310ConversionEnabled\":\"false\",\"bar.key\":\"key\"}");
     }
 
     @Test
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
index 7e34a64..f96e84e 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
@@ -289,7 +289,7 @@ public class SchemaInfoTest {
 
         @Test
         public void testUnsetProperties() {
-            final SchemaInfo schemaInfo = SchemaInfo.builder()
+            final SchemaInfo schemaInfo = SchemaInfoImpl.builder()
                     .type(SchemaType.STRING)
                     .schema(new byte[0])
                     .name("string")
@@ -305,7 +305,7 @@ public class SchemaInfoTest {
         public void testSetProperties() {
             final Map<String, String> map = Maps.newHashMap();
             map.put("test", "value");
-            final SchemaInfo schemaInfo = SchemaInfo.builder()
+            final SchemaInfo schemaInfo = SchemaInfoImpl.builder()
                     .type(SchemaType.STRING)
                     .schema(new byte[0])
                     .name("string")
@@ -322,10 +322,16 @@ public class SchemaInfoTest {
         public void testNullPropertyValue() {
             final Map<String, String> map = new HashMap<>();
             map.put("key", null);
-            final IntSchema schema = new IntSchema();
-            schema.getSchemaInfo().setProperties(map);
+
+            SchemaInfo si = SchemaInfoImpl.builder()
+                    .name("INT32")
+                    .schema(new byte[0])
+                    .type(SchemaType.INT32)
+                    .properties(map)
+                    .build();
+
             // null key will be skipped by Gson when serializing JSON to String
-            assertEquals(schema.getSchemaInfo().toString(), INT32_SCHEMA_INFO);
+            assertEquals(si.toString(), INT32_SCHEMA_INFO);
         }
     }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
index b09bf4d..b250322 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/StringSchemaTest.java
@@ -87,7 +87,7 @@ public class StringSchemaTest {
 
     @Test
     public void testSchemaInfoWithoutCharset() {
-        SchemaInfo si = new SchemaInfo()
+        SchemaInfo si = new SchemaInfoImpl()
             .setName("test-schema-info-without-charset")
             .setType(SchemaType.STRING)
             .setSchema(new byte[0])
@@ -122,7 +122,7 @@ public class StringSchemaTest {
     public void testSchemaInfoWithCharset(Charset charset) {
         Map<String, String> properties = new HashMap<>();
         properties.put(StringSchema.CHARSET_KEY, charset.name());
-        SchemaInfo si = new SchemaInfo()
+        SchemaInfo si = new SchemaInfoImpl()
             .setName("test-schema-info-without-charset")
             .setType(SchemaType.STRING)
             .setSchema(new byte[0])
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index fd29ce7..05028a0 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -183,6 +183,10 @@
       <version>1.1.7.5</version>
       <scope>test</scope>
     </dependency>
+      <dependency>
+          <groupId>com.google.code.gson</groupId>
+          <artifactId>gson</artifactId>
+      </dependency>
 
   </dependencies>
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java
index 0f0429e..c58fef3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/admin/internal/data/AuthPoliciesImpl.java
@@ -25,7 +25,6 @@ import java.util.TreeMap;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-import lombok.Value;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.AuthPolicies;
 
@@ -47,15 +46,17 @@ public final class AuthPoliciesImpl implements AuthPolicies {
         return new AuthPoliciesImplBuilder();
     }
 
-    private static class AuthPoliciesImplBuilder implements AuthPolicies.Builder {
+
+    public static class AuthPoliciesImplBuilder implements AuthPolicies.Builder {
         private Map<String, Set<AuthAction>> namespaceAuthentication = new TreeMap<>();
-        private Map<String, Map<String, Set<AuthAction>>> topicAuthentication = new TreeMap<>();
-        private Map<String, Set<String>> subscriptionAuthentication = new TreeMap<>();
+        private Map<String, Map<String, Set<AuthAction>>> topicAuthentication = new TreeMap<>();;
+        private Map<String, Set<String>> subscriptionAuthentication= new TreeMap<>();;
 
         AuthPoliciesImplBuilder() {
         }
 
-        public AuthPoliciesImplBuilder namespaceAuthentication(Map<String, Set<AuthAction>> namespaceAuthentication) {
+        public AuthPoliciesImplBuilder namespaceAuthentication(
+                Map<String, Set<AuthAction>> namespaceAuthentication) {
             this.namespaceAuthentication = namespaceAuthentication;
             return this;
         }
@@ -66,7 +67,8 @@ public final class AuthPoliciesImpl implements AuthPolicies {
             return this;
         }
 
-        public AuthPoliciesImplBuilder subscriptionAuthentication(Map<String, Set<String>> subscriptionAuthentication) {
+        public AuthPoliciesImplBuilder subscriptionAuthentication(
+                Map<String, Set<String>> subscriptionAuthentication) {
             this.subscriptionAuthentication = subscriptionAuthentication;
             return this;
         }
@@ -74,5 +76,11 @@ public final class AuthPoliciesImpl implements AuthPolicies {
         public AuthPoliciesImpl build() {
             return new AuthPoliciesImpl(namespaceAuthentication, topicAuthentication, subscriptionAuthentication);
         }
+
+        public String toString() {
+            return "AuthPoliciesImpl.AuthPoliciesImplBuilder(namespaceAuthentication=" + this.namespaceAuthentication
+                    + ", topicAuthentication=" + this.topicAuthentication + ", subscriptionAuthentication="
+                    + this.subscriptionAuthentication + ")";
+        }
     }
 }
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
similarity index 96%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
rename to pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
index 9573526..5f36909 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
@@ -46,7 +46,7 @@ public final class KeyValueSchemaInfo {
 
         @Override
         public SchemaInfo getSchemaInfo() {
-            return BytesSchema.BYTES.getSchemaInfo();
+            return Schema.BYTES.getSchemaInfo();
         }
 
         @Override
@@ -169,11 +169,12 @@ public final class KeyValueSchemaInfo {
         properties.put(KV_ENCODING_TYPE, String.valueOf(keyValueEncodingType));
 
         // generate the final schema info
-        return new SchemaInfo()
-            .setName(schemaName)
-            .setType(SchemaType.KEY_VALUE)
-            .setSchema(schemaData)
-            .setProperties(properties);
+        return SchemaInfoImpl.builder()
+                .name(schemaName)
+                .type(SchemaType.KEY_VALUE)
+                .schema(schemaData)
+                .properties(properties)
+                .build();
     }
 
     private static void encodeSubSchemaInfoToParentSchemaProperties(SchemaInfo schemaInfo,
@@ -237,7 +238,7 @@ public final class KeyValueSchemaInfo {
         } else {
             schemaProps = SchemaUtils.deserializeSchemaProperties(schemaPropsStr);
         }
-        return SchemaInfo.builder()
+        return SchemaInfoImpl.builder()
             .name(schemaName)
             .type(schemaType)
             .schema(schemaData)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java
similarity index 83%
copy from pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
copy to pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java
index f2c5860..ca8b6cc 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaInfoImpl.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.common.schema;
+package org.apache.pulsar.client.impl.schema;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import java.util.Base64;
@@ -28,9 +28,11 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import lombok.experimental.Accessors;
-import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 
 /**
  * Information about the schema.
@@ -42,7 +44,7 @@ import org.apache.pulsar.common.classification.InterfaceStability;
 @NoArgsConstructor
 @Accessors(chain = true)
 @Builder
-public class SchemaInfo {
+public class SchemaInfoImpl implements SchemaInfo {
 
     @EqualsAndHashCode.Exclude
     private String name;
@@ -75,17 +77,15 @@ public class SchemaInfo {
             case PROTOBUF_NATIVE:
                 return new String(schema, UTF_8);
             case KEY_VALUE:
-                KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue =
-                    DefaultImplementation.decodeKeyValueSchemaInfo(this);
-                return DefaultImplementation.jsonifyKeyValueSchemaInfo(schemaInfoKeyValue);
+                KeyValue<SchemaInfo, SchemaInfo> schemaInfoKeyValue = KeyValueSchemaInfo.decodeKeyValueSchemaInfo(this);
+                return SchemaUtils.jsonifyKeyValueSchemaInfo(schemaInfoKeyValue);
             default:
                 return Base64.getEncoder().encodeToString(schema);
         }
     }
 
     @Override
-    public String toString(){
-        return DefaultImplementation.jsonifySchemaInfo(this);
+    public String toString() {
+        return SchemaUtils.jsonifySchemaInfo(this);
     }
-
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
similarity index 100%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
rename to pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/SchemaUtils.java
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
index 5c00f06..d5b4405 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import lombok.Builder;
 import lombok.Data;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -45,7 +46,7 @@ public class SchemaData {
      * @return the converted schema info.
      */
     public SchemaInfo toSchemaInfo() {
-        return SchemaInfo.builder()
+        return SchemaInfoImpl.builder()
             .name("")
             .type(type)
             .schema(data)
diff --git a/pulsar-functions/runtime-all/pom.xml b/pulsar-functions/runtime-all/pom.xml
index 169985b..bf700783 100644
--- a/pulsar-functions/runtime-all/pom.xml
+++ b/pulsar-functions/runtime-all/pom.xml
@@ -30,6 +30,19 @@
     <relativePath>..</relativePath>
   </parent>
 
+  <!--
+
+    THIS MODULE SHOULD ONLY CONTAIN THE INTERFACES THAT PULSAR FUNCTION'S FRAMEWORK USES TO INTERACT WITH USER CODE.
+    THIS MODULE SHOULD ONLY DEPEND ON:
+    1. pulsar-io-core
+    2. pulsar-functions-api
+    3. pulsar-client-api
+    4. slf4j-api
+    5. log4j-slf4j-impl
+    6. log4j-api
+    7. log4j-core
+  -->
+
   <artifactId>pulsar-functions-runtime-all</artifactId>
   <name>Pulsar Functions :: Runtime All</name>
 
@@ -48,7 +61,7 @@
 
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-client-original</artifactId>
+      <artifactId>pulsar-client-api</artifactId>
       <version>${project.version}</version>
     </dependency>
 
@@ -77,6 +90,19 @@
 
   <build>
     <plugins>
+      <!--
+      Disable the maven-jar-plugin since we don't need the default jar and it conflicts with the maven-assembly-plugin
+      -->
+      <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>default-jar</id>
+            <phase/>
+          </execution>
+        </executions>
+      </plugin>
+
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-assembly-plugin</artifactId>
@@ -92,7 +118,7 @@
           <execution>
             <id>make-assembly</id>
             <!-- bind to the packaging phase -->
-            <phase>package</phase>
+            <phase>compile</phase>
             <goals>
               <goal>single</goal>
             </goals>
diff --git a/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java b/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java
index bd64bf7..6852792 100644
--- a/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java
+++ b/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java
@@ -36,6 +36,7 @@ import java.util.List;
  * This class will create three classloaders:
  *      1. The root classloader that will share interfaces between the function instance
  *      classloader and user code classloader. This classloader will contain the following dependencies
+ *          - pulsar-io-core
  *          - pulsar-functions-api
  *          - pulsar-client-api
  *          - log4j-slf4j-impl
diff --git a/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
new file mode 100644
index 0000000..3bdd23f
--- /dev/null
+++ b/pulsar-functions/runtime-all/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceDepsTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.instance;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+@Slf4j
+/**
+ * This test serves to make sure that the correct classes are included in the java-instance.jar
+ * THAT JAR SHOULD ONLY CONTAIN THE INTERFACES THAT PULSAR FUNCTION'S FRAMEWORK USES TO INTERACT WITH USER CODE
+ * WHICH INCLUDES CLASSES FROM THE FOLLOWING LIBRARIES
+ *     1. pulsar-io-core
+ *     2. pulsar-functions-api
+ *     3. pulsar-client-api
+ *     4. slf4j-api
+ *     5. log4j-slf4j-impl
+ *     6. log4j-api
+ *     7. log4j-core
+ */
+public class JavaInstanceDepsTest {
+
+    @Test
+    public void testInstanceJarDeps() throws IOException {
+        File jar = new File("target/java-instance.jar");
+        
+        @Cleanup
+        ZipInputStream zip = new ZipInputStream(jar.toURI().toURL().openStream());
+
+        List<String> notAllowedClasses = new LinkedList<>();
+        while(true) {
+            ZipEntry e = zip.getNextEntry();
+            if (e == null)
+                break;
+            String name = e.getName();
+            if (name.endsWith(".class") && !name.startsWith("META-INF")) {
+                // The only classes in the java-instance.jar should be org.apache.pulsar, slf4j, and log4j classes
+                // filter out those classes to see if there are any other classes that should not be allowed
+                if (!name.startsWith("org/apache/pulsar")
+                        && !name.startsWith("org/slf4j")
+                        && !name.startsWith("org/apache/logging/slf4j")
+                        && !name.startsWith("org/apache/logging/log4j")) {
+                    notAllowedClasses.add(name);
+                }
+            }
+        }
+
+        Assert.assertEquals(notAllowedClasses, Collections.emptyList(), notAllowedClasses.toString());
+    }
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index e0a9c64..474410c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -179,7 +179,6 @@ public class ThreadRuntime implements Runtime {
                 String.format("%s-%s",
                         FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
                         instanceConfig.getInstanceId()));
-        this.fnThread.setContextClassLoader(functionClassLoader);
         this.fnThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             @Override
             public void uncaughtException(Thread t, Throwable e) {
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java
index 2db9d6c..ba57692 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java
@@ -27,6 +27,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -44,7 +45,7 @@ public class KafkaSchemaWrappedSchema implements Schema<byte[]>, Serializable {
         Map<String, String> props = new HashMap<>();
         boolean isJsonConverter = converter instanceof JsonConverter;
         props.put(GenericAvroSchema.OFFSET_PROP, isJsonConverter ? "0" : "5");
-        this.schemaInfo = SchemaInfo.builder()
+        this.schemaInfo = SchemaInfoImpl.builder()
                 .name(isJsonConverter? "KafKaJson" : "KafkaAvro")
                 .type(isJsonConverter ? SchemaType.JSON : SchemaType.AVRO)
                 .schema(schema.toString().getBytes(UTF_8))
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
index eda8c96..2a9e1c4 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
@@ -25,6 +25,7 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -65,7 +66,7 @@ final class AvroSchemaCache {
             org.apache.avro.Schema schema = schemaRegistryClient.getById(schemaId);
             String definition = schema.toString(false);
             log.info("Schema {} definition {}", schemaId, definition);
-            SchemaInfo schemaInfo = SchemaInfo.builder()
+            SchemaInfo schemaInfo = SchemaInfoImpl.builder()
                     .type(SchemaType.AVRO)
                     .name(schema.getName())
                     .properties(Collections.emptyMap())
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
index 81bf031..47bedbe 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
@@ -42,6 +42,7 @@ import org.apache.kafka.common.serialization.ShortDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -235,7 +236,7 @@ public class KafkaBytesSource extends KafkaAbstractSource<ByteBuffer> {
 
      static final class DeferredSchemaPlaceholder extends ByteBufferSchemaWrapper {
         DeferredSchemaPlaceholder() {
-            super(SchemaInfo
+            super(SchemaInfoImpl
                     .builder()
                     .type(SchemaType.AVRO)
                     .properties(Collections.emptyMap())
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index b155e45..3f0bc09 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -450,7 +450,6 @@ The Apache Software License, Version 2.0
   * Apache Zookeeper
     - zookeeper-3.6.3.jar
     - zookeeper-jute-3.6.3.jar
-    - zookeeper-prometheus-metrics-3.6.3.jar
   * Apache Yetus Audience Annotations
     - audience-annotations-0.5.0.jar
   * Swagger
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
index 03a6b77..645edbd 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
@@ -32,6 +32,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -101,7 +102,7 @@ public class PulsarSplit implements ConnectorSplit {
         this.offloadPolicies = offloadPolicies;
 
         ObjectMapper objectMapper = new ObjectMapper();
-        this.schemaInfo = SchemaInfo.builder()
+        this.schemaInfo = SchemaInfoImpl.builder()
                 .name(originSchemaName)
                 .type(schemaType)
                 .schema(schema.getBytes("ISO8859-1"))
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index cd31b4b..79fb789 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -23,6 +23,7 @@ import io.prestosql.spi.PrestoException;
 import io.prestosql.spi.connector.*;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -188,9 +189,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
     @Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
     public void testGetTableMetadataTableBlankSchema(String delimiter) throws PulsarAdminException {
         updateRewriteNamespaceDelimiterIfNeeded(delimiter);
-        SchemaInfo badSchemaInfo = new SchemaInfo();
-        badSchemaInfo.setSchema(new byte[0]);
-        badSchemaInfo.setType(SchemaType.AVRO);
+        SchemaInfo badSchemaInfo = SchemaInfoImpl.builder()
+                .schema(new byte[0])
+                .type(SchemaType.AVRO)
+                .build();
         when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
 
         PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
@@ -214,9 +216,10 @@ public class TestPulsarMetadata extends TestPulsarConnector {
     @Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
     public void testGetTableMetadataTableInvalidSchema(String delimiter) throws PulsarAdminException {
         updateRewriteNamespaceDelimiterIfNeeded(delimiter);
-        SchemaInfo badSchemaInfo = new SchemaInfo();
-        badSchemaInfo.setSchema("foo".getBytes());
-        badSchemaInfo.setType(SchemaType.AVRO);
+        SchemaInfo badSchemaInfo = SchemaInfoImpl.builder()
+                .schema("foo".getBytes())
+                .type(SchemaType.AVRO)
+                .build();
         when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenReturn(badSchemaInfo);
 
         PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java
index f4810ba..c1b97d3 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java
@@ -22,6 +22,7 @@ import io.airlift.slice.Slices;
 import io.prestosql.decoder.DecoderColumnHandle;
 import io.prestosql.decoder.FieldValueProvider;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.sql.presto.PulsarColumnHandle;
@@ -65,7 +66,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
     public void testPrimitiveType() {
 
         byte int8Value = 1;
-        SchemaInfo schemaInfoInt8 = SchemaInfo.builder().type(SchemaType.INT8).build();
+        SchemaInfo schemaInfoInt8 = SchemaInfoImpl.builder().type(SchemaType.INT8).build();
         Schema schemaInt8 = Schema.getSchema(schemaInfoInt8);
         List<PulsarColumnHandle> pulsarColumnHandleInt8 = getColumnColumnHandles(topicName, schemaInfoInt8, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
         PulsarRowDecoder pulsarRowDecoderInt8 = decoderFactory.createRowDecoder(topicName, schemaInfoInt8,
@@ -77,7 +78,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PRIMITIVE_COLUMN_NAME, TINYINT, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int8Value);
 
         short int16Value = 2;
-        SchemaInfo schemaInfoInt16 = SchemaInfo.builder().type(SchemaType.INT16).build();
+        SchemaInfo schemaInfoInt16 = SchemaInfoImpl.builder().type(SchemaType.INT16).build();
         Schema schemaInt16 = Schema.getSchema(schemaInfoInt16);
         List<PulsarColumnHandle> pulsarColumnHandleInt16 = getColumnColumnHandles(topicName, schemaInfoInt16, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
         PulsarRowDecoder pulsarRowDecoderInt16 = decoderFactory.createRowDecoder(topicName, schemaInfoInt16,
@@ -89,7 +90,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PRIMITIVE_COLUMN_NAME, SMALLINT, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int16Value);
 
         int int32Value = 2;
-        SchemaInfo schemaInfoInt32 = SchemaInfo.builder().type(SchemaType.INT32).build();
+        SchemaInfo schemaInfoInt32 = SchemaInfoImpl.builder().type(SchemaType.INT32).build();
         Schema schemaInt32 = Schema.getSchema(schemaInfoInt32);
         List<PulsarColumnHandle> pulsarColumnHandleInt32 = getColumnColumnHandles(topicName, schemaInfoInt32,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -102,7 +103,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PRIMITIVE_COLUMN_NAME, INTEGER, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int32Value);
 
         long int64Value = 2;
-        SchemaInfo schemaInfoInt64 = SchemaInfo.builder().type(SchemaType.INT64).build();
+        SchemaInfo schemaInfoInt64 = SchemaInfoImpl.builder().type(SchemaType.INT64).build();
         Schema schemaInt64 = Schema.getSchema(schemaInfoInt64);
         List<PulsarColumnHandle> pulsarColumnHandleInt64 = getColumnColumnHandles(topicName, schemaInfoInt64,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -116,7 +117,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PulsarColumnHandle.HandleKeyValueType.NONE), int64Value);
 
         String stringValue = "test";
-        SchemaInfo schemaInfoString = SchemaInfo.builder().type(SchemaType.STRING).build();
+        SchemaInfo schemaInfoString = SchemaInfoImpl.builder().type(SchemaType.STRING).build();
         Schema schemaString = Schema.getSchema(schemaInfoString);
         List<PulsarColumnHandle> pulsarColumnHandleString = getColumnColumnHandles(topicName, schemaInfoString,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -130,7 +131,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PulsarColumnHandle.HandleKeyValueType.NONE), stringValue);
 
         float floatValue = 0.2f;
-        SchemaInfo schemaInfoFloat = SchemaInfo.builder().type(SchemaType.FLOAT).build();
+        SchemaInfo schemaInfoFloat = SchemaInfoImpl.builder().type(SchemaType.FLOAT).build();
         Schema schemaFloat = Schema.getSchema(schemaInfoFloat);
         List<PulsarColumnHandle> pulsarColumnHandleFloat = getColumnColumnHandles(topicName, schemaInfoFloat,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -144,7 +145,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PulsarColumnHandle.HandleKeyValueType.NONE), Long.valueOf(Float.floatToIntBits(floatValue)));
 
         double doubleValue = 0.22d;
-        SchemaInfo schemaInfoDouble = SchemaInfo.builder().type(SchemaType.DOUBLE).build();
+        SchemaInfo schemaInfoDouble = SchemaInfoImpl.builder().type(SchemaType.DOUBLE).build();
         Schema schemaDouble = Schema.getSchema(schemaInfoDouble);
         List<PulsarColumnHandle> pulsarColumnHandleDouble = getColumnColumnHandles(topicName, schemaInfoDouble,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -158,7 +159,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PulsarColumnHandle.HandleKeyValueType.NONE), doubleValue);
 
         boolean booleanValue = true;
-        SchemaInfo schemaInfoBoolean = SchemaInfo.builder().type(SchemaType.BOOLEAN).build();
+        SchemaInfo schemaInfoBoolean = SchemaInfoImpl.builder().type(SchemaType.BOOLEAN).build();
         Schema schemaBoolean = Schema.getSchema(schemaInfoBoolean);
         List<PulsarColumnHandle> pulsarColumnHandleBoolean = getColumnColumnHandles(topicName, schemaInfoBoolean,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -173,7 +174,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
 
         byte[] bytesValue = new byte[1];
         bytesValue[0] = 1;
-        SchemaInfo schemaInfoBytes = SchemaInfo.builder().type(SchemaType.BYTES).build();
+        SchemaInfo schemaInfoBytes = SchemaInfoImpl.builder().type(SchemaType.BYTES).build();
         Schema schemaBytes = Schema.getSchema(schemaInfoBytes);
         List<PulsarColumnHandle> pulsarColumnHandleBytes = getColumnColumnHandles(topicName, schemaInfoBytes,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -187,7 +188,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PulsarColumnHandle.HandleKeyValueType.NONE), Slices.wrappedBuffer(bytesValue));
 
         Date dateValue = new Date(System.currentTimeMillis());
-        SchemaInfo schemaInfoDate = SchemaInfo.builder().type(SchemaType.DATE).build();
+        SchemaInfo schemaInfoDate = SchemaInfoImpl.builder().type(SchemaType.DATE).build();
         Schema schemaDate = Schema.getSchema(schemaInfoDate);
         List<PulsarColumnHandle> pulsarColumnHandleDate = getColumnColumnHandles(topicName, schemaInfoDate,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -201,7 +202,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PulsarColumnHandle.HandleKeyValueType.NONE), dateValue.getTime());
 
         Time timeValue = new Time(System.currentTimeMillis());
-        SchemaInfo schemaInfoTime = SchemaInfo.builder().type(SchemaType.TIME).build();
+        SchemaInfo schemaInfoTime = SchemaInfoImpl.builder().type(SchemaType.TIME).build();
         Schema schemaTime = Schema.getSchema(schemaInfoTime);
         List<PulsarColumnHandle> pulsarColumnHandleTime = getColumnColumnHandles(topicName, schemaInfoTime,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -215,7 +216,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PulsarColumnHandle.HandleKeyValueType.NONE), timeValue.getTime());
 
         Timestamp timestampValue = new Timestamp(System.currentTimeMillis());
-        SchemaInfo schemaInfoTimestamp = SchemaInfo.builder().type(SchemaType.TIMESTAMP).build();
+        SchemaInfo schemaInfoTimestamp = SchemaInfoImpl.builder().type(SchemaType.TIMESTAMP).build();
         Schema schemaTimestamp = Schema.getSchema(schemaInfoTimestamp);
         List<PulsarColumnHandle> pulsarColumnHandleTimestamp = getColumnColumnHandles(topicName, schemaInfoTimestamp,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
diff --git a/tests/docker-images/java-test-functions/pom.xml b/tests/docker-images/java-test-functions/pom.xml
index bb5190e..140168d 100644
--- a/tests/docker-images/java-test-functions/pom.xml
+++ b/tests/docker-images/java-test-functions/pom.xml
@@ -36,7 +36,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.pulsar</groupId>
-            <artifactId>pulsar-client-original</artifactId>
+            <artifactId>pulsar-client</artifactId>
             <version>${project.version}</version>
         </dependency>
     </dependencies>
@@ -82,24 +82,10 @@
                             </transformers>
                             <artifactSet>
                                 <includes>
-                                    <include>org.apache.pulsar:pulsar-client-original</include>
-                                    <include>org.apache.pulsar:pulsar-client-api</include>
-                                    <include>org.apache.pulsar:pulsar-client-admin-api</include>
+                                    <include>org.apache.pulsar:pulsar-client</include>
                                     <include>org.apache.pulsar:pulsar-functions-api-examples</include>
                                 </includes>
                             </artifactSet>
-                            <filters>
-                                <filter>
-                                    <artifact>org.apache.pulsar:pulsar-client-original</artifact>
-                                    <includes>
-                                        <include>**</include>
-                                    </includes>
-                                    <excludes>
-                                        <!-- bouncycastle jars could not be shaded, or the signatures will be wrong-->
-                                        <exclude>org/bouncycastle/**</exclude>
-                                    </excludes>
-                                </filter>
-                            </filters>
                         </configuration>
                     </execution>
                 </executions>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java
index e0cc328..3c961fa 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/GenericRecordSourceTest.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 /**
@@ -59,41 +60,44 @@ public class GenericRecordSourceTest extends PulsarStandaloneTestSuite {
         String outputTopicName = "test-state-source-output-" + randomName(8);
         String sourceName = "test-state-source-" + randomName(8);
         int numMessages = 10;
+        try {
+            submitSourceConnector(
+                    sourceName,
+                    outputTopicName,
+                    "org.apache.pulsar.tests.integration.io.GenericRecordSource", JAVAJAR);
 
-        submitSourceConnector(
-            sourceName,
-            outputTopicName,
-            "org.apache.pulsar.tests.integration.io.GenericRecordSource", JAVAJAR);
-
-        // get source info
-        getSourceInfoSuccess(container, sourceName);
+            // get source info
+            getSourceInfoSuccess(container, sourceName);
 
-        // get source status
-        getSourceStatus(container, sourceName);
+            // get source status
+            getSourceStatus(container, sourceName);
 
-        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
+            try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {
 
-            retryStrategically((test) -> {
-                try {
-                    SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
-                    return status.getInstances().size() > 0
-                        && status.getInstances().get(0).getStatus().numWritten >= 10;
-                } catch (PulsarAdminException e) {
-                    return false;
-                }
-            }, 10, 200);
+                retryStrategically((test) -> {
+                    try {
+                        SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
+                        return status.getInstances().size() > 0
+                                && status.getInstances().get(0).getStatus().numWritten >= 10;
+                    } catch (PulsarAdminException e) {
+                        return false;
+                    }
+                }, 10, 200);
 
-            SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
-            assertEquals(status.getInstances().size(), 1);
-            assertTrue(status.getInstances().get(0).getStatus().numWritten >= 10);
-        }
+                SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
+                assertEquals(status.getInstances().size(), 1);
+                assertTrue(status.getInstances().get(0).getStatus().numWritten >= 10);
+            }
 
-        consumeMessages(container, outputTopicName, numMessages);
+            consumeMessages(container, outputTopicName, numMessages);
 
-        // delete source
-        deleteSource(container, sourceName);
+            // delete source
+            deleteSource(container, sourceName);
 
-        getSourceInfoNotFound(container, sourceName);
+            getSourceInfoNotFound(container, sourceName);
+        } finally {
+            dumpFunctionLogs(sourceName);
+        }
 
     }
 
@@ -129,15 +133,35 @@ public class GenericRecordSourceTest extends PulsarStandaloneTestSuite {
     }
 
     private static void getSourceStatus(StandaloneContainer container,String sourceName) throws Exception {
+        retryStrategically((test) -> {
+                    try {
+                        ContainerExecResult result = container.execCmd(
+                                PulsarCluster.ADMIN_SCRIPT,
+                                "sources",
+                                "status",
+                                "--tenant", "public",
+                                "--namespace", "default",
+                                "--name", sourceName);
+
+                        if (result.getStdout().contains("\"running\" : true")) {
+                            return true;
+                        }
+                        return false;
+                    } catch (Exception e) {
+                        log.error("Encountered error when getting source status", e);
+                        return false;
+                    }
+                }, 10, 200);
+
         ContainerExecResult result = container.execCmd(
-            PulsarCluster.ADMIN_SCRIPT,
-            "sources",
-            "status",
-            "--tenant", "public",
-            "--namespace", "default",
-            "--name", sourceName
-        );
-        assertTrue(result.getStdout().contains("\"running\" : true"));
+                PulsarCluster.ADMIN_SCRIPT,
+                "sources",
+                "status",
+                "--tenant", "public",
+                "--namespace", "default",
+                "--name", sourceName);
+
+        Assert.assertTrue(result.getStdout().contains("\"running\" : true"));
     }
 
     private static void consumeMessages(StandaloneContainer container, String outputTopic,

[pulsar] 02/03: [Security] Upgrade Zookeeper to 3.6.3 (#10852)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 09290157d8b7ded1387bc19cf412d59296756949
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Mon Jun 7 20:27:23 2021 +0300

    [Security] Upgrade Zookeeper to 3.6.3 (#10852)
    
    - Zookeeper 3.6.2 gets flagged as vulnerable
      https://ossindex.sonatype.org/component/pkg:maven/org.apache.zookeeper/zookeeper@3.6.2
      because of using vulnerable Netty version
    
    (cherry picked from commit aa36eb8e933704ffdccbda9b4f1fded39c6e135c)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 6 +++---
 pom.xml                                          | 2 +-
 pulsar-sql/presto-distribution/LICENSE           | 6 +++---
 3 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 3005df8..d9874a0 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -510,9 +510,9 @@ The Apache Software License, Version 2.0
     - io.vertx-vertx-core-3.5.4.jar
     - io.vertx-vertx-web-3.5.4.jar
   * Apache ZooKeeper
-    - org.apache.zookeeper-zookeeper-3.6.2.jar
-    - org.apache.zookeeper-zookeeper-jute-3.6.2.jar
-    - org.apache.zookeeper-zookeeper-prometheus-metrics-3.6.2.jar
+    - org.apache.zookeeper-zookeeper-3.6.3.jar
+    - org.apache.zookeeper-zookeeper-jute-3.6.3.jar
+    - org.apache.zookeeper-zookeeper-prometheus-metrics-3.6.3.jar
   * Snappy Java
     - org.xerial.snappy-snappy-java-1.1.7.jar
   * Google HTTP Client
diff --git a/pom.xml b/pom.xml
index 578c05d..f4ce36c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,7 +104,7 @@ flexible messaging model and an intuitive client API.</description>
     <commons-compress.version>1.19</commons-compress.version>
 
     <bookkeeper.version>4.14.1</bookkeeper.version>
-    <zookeeper.version>3.6.2</zookeeper.version>
+    <zookeeper.version>3.6.3</zookeeper.version>
     <snappy.version>1.1.7</snappy.version> <!-- ZooKeeper server -->
     <dropwizardmetrics.version>3.2.5</dropwizardmetrics.version> <!-- ZooKeeper server -->
     <curator.version>5.1.0</curator.version>
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 0933670..b155e45 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -448,9 +448,9 @@ The Apache Software License, Version 2.0
     - memory-0.8.3.jar
     - sketches-core-0.8.3.jar
   * Apache Zookeeper
-    - zookeeper-3.6.2.jar
-    - zookeeper-jute-3.6.2.jar
-    - zookeeper-prometheus-metrics-3.6.2.jar
+    - zookeeper-3.6.3.jar
+    - zookeeper-jute-3.6.3.jar
+    - zookeeper-prometheus-metrics-3.6.3.jar
   * Apache Yetus Audience Annotations
     - audience-annotations-0.5.0.jar
   * Swagger

[pulsar] 01/03: Upgrade Jetty to 9.4.42.v20210604 (#10907)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1916324c8f07bb43a2693921fb18ec5cdd5278ac
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Fri Jun 11 21:12:26 2021 +0300

    Upgrade Jetty to 9.4.42.v20210604 (#10907)
    
    Fixes #10906
    
    Also addresses CVE-2021-28169
    
    (cherry picked from commit 6c03154ff29868181124a0a1a81e9ea09b22a9b0)
---
 distribution/server/src/assemble/LICENSE.bin.txt | 38 ++++++++++++------------
 pom.xml                                          |  2 +-
 pulsar-sql/presto-distribution/LICENSE           | 32 ++++++++++----------
 3 files changed, 36 insertions(+), 36 deletions(-)

diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 1b40206..3005df8 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -424,25 +424,25 @@ The Apache Software License, Version 2.0
     - org.asynchttpclient-async-http-client-2.12.1.jar
     - org.asynchttpclient-async-http-client-netty-utils-2.12.1.jar
  * Jetty
-    - org.eclipse.jetty-jetty-client-9.4.39.v20210325.jar
-    - org.eclipse.jetty-jetty-continuation-9.4.39.v20210325.jar
-    - org.eclipse.jetty-jetty-http-9.4.39.v20210325.jar
-    - org.eclipse.jetty-jetty-io-9.4.39.v20210325.jar
-    - org.eclipse.jetty-jetty-proxy-9.4.39.v20210325.jar
-    - org.eclipse.jetty-jetty-security-9.4.39.v20210325.jar
-    - org.eclipse.jetty-jetty-server-9.4.39.v20210325.jar
-    - org.eclipse.jetty-jetty-servlet-9.4.39.v20210325.jar
-    - org.eclipse.jetty-jetty-servlets-9.4.39.v20210325.jar
-    - org.eclipse.jetty-jetty-util-9.4.39.v20210325.jar
-    - org.eclipse.jetty-jetty-util-ajax-9.4.39.v20210325.jar
-    - org.eclipse.jetty.websocket-javax-websocket-client-impl-9.4.39.v20210325.jar
-    - org.eclipse.jetty.websocket-websocket-api-9.4.39.v20210325.jar
-    - org.eclipse.jetty.websocket-websocket-client-9.4.39.v20210325.jar
-    - org.eclipse.jetty.websocket-websocket-common-9.4.39.v20210325.jar
-    - org.eclipse.jetty.websocket-websocket-server-9.4.39.v20210325.jar
-    - org.eclipse.jetty.websocket-websocket-servlet-9.4.39.v20210325.jar
-    - org.eclipse.jetty-jetty-alpn-conscrypt-server-9.4.39.v20210325.jar
-    - org.eclipse.jetty-jetty-alpn-server-9.4.39.v20210325.jar
+    - org.eclipse.jetty-jetty-client-9.4.42.v20210604.jar
+    - org.eclipse.jetty-jetty-continuation-9.4.42.v20210604.jar
+    - org.eclipse.jetty-jetty-http-9.4.42.v20210604.jar
+    - org.eclipse.jetty-jetty-io-9.4.42.v20210604.jar
+    - org.eclipse.jetty-jetty-proxy-9.4.42.v20210604.jar
+    - org.eclipse.jetty-jetty-security-9.4.42.v20210604.jar
+    - org.eclipse.jetty-jetty-server-9.4.42.v20210604.jar
+    - org.eclipse.jetty-jetty-servlet-9.4.42.v20210604.jar
+    - org.eclipse.jetty-jetty-servlets-9.4.42.v20210604.jar
+    - org.eclipse.jetty-jetty-util-9.4.42.v20210604.jar
+    - org.eclipse.jetty-jetty-util-ajax-9.4.42.v20210604.jar
+    - org.eclipse.jetty.websocket-javax-websocket-client-impl-9.4.42.v20210604.jar
+    - org.eclipse.jetty.websocket-websocket-api-9.4.42.v20210604.jar
+    - org.eclipse.jetty.websocket-websocket-client-9.4.42.v20210604.jar
+    - org.eclipse.jetty.websocket-websocket-common-9.4.42.v20210604.jar
+    - org.eclipse.jetty.websocket-websocket-server-9.4.42.v20210604.jar
+    - org.eclipse.jetty.websocket-websocket-servlet-9.4.42.v20210604.jar
+    - org.eclipse.jetty-jetty-alpn-conscrypt-server-9.4.42.v20210604.jar
+    - org.eclipse.jetty-jetty-alpn-server-9.4.42.v20210604.jar
  * SnakeYaml -- org.yaml-snakeyaml-1.27.jar
  * RocksDB - org.rocksdb-rocksdbjni-6.10.2.jar
  * Google Error Prone Annotations - com.google.errorprone-error_prone_annotations-2.5.1.jar
diff --git a/pom.xml b/pom.xml
index d24e404..578c05d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,7 +110,7 @@ flexible messaging model and an intuitive client API.</description>
     <curator.version>5.1.0</curator.version>
     <netty.version>4.1.63.Final</netty.version>
     <netty-tc-native.version>2.0.38.Final</netty-tc-native.version>
-    <jetty.version>9.4.39.v20210325</jetty.version>
+    <jetty.version>9.4.42.v20210604</jetty.version>
     <conscrypt.version>2.5.2</conscrypt.version>
     <jersey.version>2.34</jersey.version>
     <athenz.version>1.10.9</athenz.version>
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index 7b92783..0933670 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -251,22 +251,22 @@ The Apache Software License, Version 2.0
  * Joda Time
     - joda-time-2.10.5.jar
   * Jetty
-    - http2-client-9.4.39.v20210325.jar
-    - http2-common-9.4.39.v20210325.jar
-    - http2-hpack-9.4.39.v20210325.jar
-    - http2-http-client-transport-9.4.39.v20210325.jar
-    - jetty-alpn-client-9.4.39.v20210325.jar
-    - http2-server-9.4.39.v20210325.jar
-    - jetty-alpn-java-client-9.4.39.v20210325.jar
-    - jetty-client-9.4.39.v20210325.jar
-    - jetty-http-9.4.39.v20210325.jar
-    - jetty-io-9.4.39.v20210325.jar
-    - jetty-jmx-9.4.39.v20210325.jar
-    - jetty-security-9.4.39.v20210325.jar
-    - jetty-server-9.4.39.v20210325.jar
-    - jetty-servlet-9.4.39.v20210325.jar
-    - jetty-util-9.4.39.v20210325.jar
-    - jetty-util-ajax-9.4.39.v20210325.jar
+    - http2-client-9.4.42.v20210604.jar
+    - http2-common-9.4.42.v20210604.jar
+    - http2-hpack-9.4.42.v20210604.jar
+    - http2-http-client-transport-9.4.42.v20210604.jar
+    - jetty-alpn-client-9.4.42.v20210604.jar
+    - http2-server-9.4.42.v20210604.jar
+    - jetty-alpn-java-client-9.4.42.v20210604.jar
+    - jetty-client-9.4.42.v20210604.jar
+    - jetty-http-9.4.42.v20210604.jar
+    - jetty-io-9.4.42.v20210604.jar
+    - jetty-jmx-9.4.42.v20210604.jar
+    - jetty-security-9.4.42.v20210604.jar
+    - jetty-server-9.4.42.v20210604.jar
+    - jetty-servlet-9.4.42.v20210604.jar
+    - jetty-util-9.4.42.v20210604.jar
+    - jetty-util-ajax-9.4.42.v20210604.jar
   * Apache BVal
     - bval-jsr-2.0.0.jar
   * Bytecode