You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/03/19 15:25:39 UTC

[pulsar] branch master updated: revise the schema default type not null (#3752)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a1c557  revise the schema default type not null (#3752)
1a1c557 is described below

commit 1a1c557b27bdc4e48fcd4a2da6f71e0d78873697
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Mar 19 23:25:34 2019 +0800

    revise the schema default type not null (#3752)
    
    ### Motivation
    Fix #3741
    
    ### Modifications
    Support define not not allow null field in schema
    
    ### Verifying this change
    Add not allow null field schema verify
    
    Does this pull request potentially affect one of the following parts:
    If yes was chosen, please highlight the changes
    
    Dependencies (does it add or upgrade a dependency): (no)
    The public API: (no)
    The schema: (yes)
    The default values of configurations: (no)
    The wire protocol: (no)
    The rest endpoints: (no)
    The admin cli options: (no)
    Anything that affects deployment: (no)
---
 .../broker/service/schema/ClientGetSchemaTest.java |   8 +
 .../schema/JsonSchemaCompatibilityCheckTest.java   |   5 +-
 .../api/SimpleTypedProducerConsumerTest.java       |  28 ++--
 .../java/org/apache/pulsar/client/api/Schema.java  |  49 +++---
 .../pulsar/client/api/schema/SchemaDefinition.java |  64 ++++++++
 .../client/api/schema/SchemaDefinitionBuilder.java |  81 ++++++++++
 .../client/internal/DefaultImplementation.java     |  37 ++---
 .../pulsar/client/impl/schema/AvroSchema.java      |  34 ++---
 .../pulsar/client/impl/schema/JSONSchema.java      |  36 ++---
 .../impl/schema/SchemaDefinitionBuilderImpl.java   |  95 ++++++++++++
 .../client/impl/schema/SchemaDefinitionImpl.java   |  98 ++++++++++++
 .../pulsar/client/impl/schema/StructSchema.java    |  10 +-
 .../pulsar/client/impl/schema/AvroSchemaTest.java  |  79 ++++++++--
 .../pulsar/client/impl/schema/JSONSchemaTest.java  | 167 +++++++++++++++++++--
 .../client/impl/schema/KeyValueSchemaTest.java     | 164 +++++++++++++++++---
 .../client/impl/schema/SchemaBuilderTest.java      |   7 +
 .../pulsar/client/impl/schema/SchemaTestUtils.java |  40 +++--
 .../impl/schema/generic/GenericSchemaImplTest.java |   4 +-
 .../MultiVersionGenericSchemaProviderTest.java     |   4 +-
 .../tutorial/SampleAsyncProducerWithSchema.java    |   4 +-
 .../client/tutorial/SampleConsumerWithSchema.java  |   4 +-
 .../pulsar/functions/source/TopicSchema.java       |   5 +-
 .../io/hbase/sink/HbaseGenericRecordSinkTest.java  |   3 +-
 .../org/apache/pulsar/io/jdbc/JdbcSinkTest.java    |   3 +-
 .../pulsar/sql/presto/TestPulsarConnector.java     |  27 ++--
 .../tests/integration/io/JdbcSinkTester.java       |   3 +-
 26 files changed, 868 insertions(+), 191 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
index 7055492..9fcd231 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/ClientGetSchemaTest.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -43,6 +44,8 @@ public class ClientGetSchemaTest extends ProducerConsumerBase {
     private static final String topicString = "my-property/my-ns/topic-string";
     private static final String topicJson = "my-property/my-ns/topic-json";
     private static final String topicAvro = "my-property/my-ns/topic-avro";
+    private static final String topicJsonNotNull = "my-property/my-ns/topic-json-not-null";
+    private static final String topicAvroNotNull = "my-property/my-ns/topic-avro-not-null";
 
     List<Producer<?>> producers = new ArrayList<>();
 
@@ -62,6 +65,11 @@ public class ClientGetSchemaTest extends ProducerConsumerBase {
         producers.add(pulsarClient.newProducer(Schema.STRING).topic(topicString).create());
         producers.add(pulsarClient.newProducer(Schema.AVRO(MyClass.class)).topic(topicAvro).create());
         producers.add(pulsarClient.newProducer(Schema.JSON(MyClass.class)).topic(topicJson).create());
+        producers.add(pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).build())).topic(topicAvro).create());
+        producers.add(pulsarClient.newProducer(Schema.JSON(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).build())).topic(topicJson).create());
+        producers.add(pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).withAlwaysAllowNull(false).build())).topic(topicAvroNotNull).create());
+        producers.add(pulsarClient.newProducer(Schema.JSON(SchemaDefinition.<MyClass>builder().withPojo(MyClass.class).withAlwaysAllowNull(false).build())).topic(topicJsonNotNull).create());
+
     }
 
     @AfterClass
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
index 17b39b7..a785927 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
@@ -33,6 +33,7 @@ import lombok.ToString;
 
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -51,11 +52,11 @@ public class JsonSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilit
     public void testJsonSchemaBackwardsCompatibility() throws JsonProcessingException {
 
         SchemaData from = SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
-        SchemaData to = SchemaData.builder().data(JSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
+        SchemaData to = SchemaData.builder().data(JSONSchema.of(SchemaDefinition.builder().withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
         JsonSchemaCompatibilityCheck jsonSchemaCompatibilityCheck = new JsonSchemaCompatibilityCheck();
         Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to, SchemaCompatibilityStrategy.FULL));
 
-        from = SchemaData.builder().data(JSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
+        from = SchemaData.builder().data(JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build()).getSchemaInfo().getSchema()).build();
         to = SchemaData.builder().data(OldJSONSchema.of(Foo.class).getSchemaInfo().getSchema()).build();
         Assert.assertTrue(jsonSchemaCompatibilityCheck.isCompatible(from, to, SchemaCompatibilityStrategy.FULL));
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
index c9e919e..050db39 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
 import org.apache.pulsar.broker.service.schema.SchemaRegistry;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
@@ -63,7 +64,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
         log.info("-- Starting {} test --", methodName);
 
         JSONSchema<JsonEncodedPojo> jsonSchema =
-            JSONSchema.of(JsonEncodedPojo.class);
+            JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build());
 
         Consumer<JsonEncodedPojo> consumer = pulsarClient
             .newConsumer(jsonSchema)
@@ -108,7 +109,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
         log.info("-- Starting {} test --", methodName);
 
         JSONSchema<JsonEncodedPojo> jsonSchema =
-            JSONSchema.of(JsonEncodedPojo.class);
+            JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build());
 
         pulsar.getSchemaRegistryService()
             .putSchemaIfAbsent("my-property/my-ns/my-topic1",
@@ -166,7 +167,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
             ).get();
 
         Consumer<JsonEncodedPojo> consumer = pulsarClient
-            .newConsumer(JSONSchema.of(JsonEncodedPojo.class))
+            .newConsumer(JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build()))
             .topic("persistent://my-property/use/my-ns/my-topic1")
             .subscriptionName("my-subscriber-name")
             .subscribe();
@@ -194,7 +195,7 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
             ).get();
 
         Producer<JsonEncodedPojo> producer = pulsarClient
-            .newProducer(JSONSchema.of(JsonEncodedPojo.class))
+            .newProducer(JSONSchema.of(SchemaDefinition.<JsonEncodedPojo>builder().withPojo(JsonEncodedPojo.class).build()))
             .topic("persistent://my-property/use/my-ns/my-topic1")
             .create();
 
@@ -273,7 +274,9 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
                 ).get();
 
         Consumer<org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong> consumer = pulsarClient
-                .newConsumer(AvroSchema.of(org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong.class))
+                .newConsumer(AvroSchema.of
+                        (SchemaDefinition.<org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong>builder().
+                        withPojo(org.apache.pulsar.client.api.schema.proto.Test.TestMessageWrong.class).build()))
                 .topic("persistent://my-property/use/my-ns/my-topic1")
                 .subscriptionName("my-subscriber-name")
                 .subscribe();
@@ -286,7 +289,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
        log.info("-- Starting {} test --", methodName);
 
        AvroSchema<AvroEncodedPojo> avroSchema =
-           AvroSchema.of(AvroEncodedPojo.class);
+           AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
+                   withPojo(AvroEncodedPojo.class).build());
 
        Consumer<AvroEncodedPojo> consumer = pulsarClient
            .newConsumer(avroSchema)
@@ -355,7 +359,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
             ).get();
 
         Consumer<AvroEncodedPojo> consumer = pulsarClient
-            .newConsumer(AvroSchema.of(AvroEncodedPojo.class))
+            .newConsumer(AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
+                    withPojo(AvroEncodedPojo.class).build()))
             .topic("persistent://my-property/use/my-ns/my-topic1")
             .subscriptionName("my-subscriber-name")
             .subscribe();
@@ -454,7 +459,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
        log.info("-- Starting {} test --", methodName);
 
        AvroSchema<AvroEncodedPojo> avroSchema =
-           AvroSchema.of(AvroEncodedPojo.class);
+           AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
+                   withPojo(AvroEncodedPojo.class).build());
 
        Producer<AvroEncodedPojo> producer = pulsarClient
            .newProducer(avroSchema)
@@ -502,7 +508,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
        log.info("-- Starting {} test --", methodName);
 
        AvroSchema<AvroEncodedPojo> avroSchema =
-           AvroSchema.of(AvroEncodedPojo.class);
+           AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
+                   withPojo(AvroEncodedPojo.class).build());
 
        Producer<AvroEncodedPojo> producer = pulsarClient
            .newProducer(avroSchema)
@@ -548,7 +555,8 @@ public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase {
         log.info("-- Starting {} test --", methodName);
 
         AvroSchema<AvroEncodedPojo> avroSchema =
-            AvroSchema.of(AvroEncodedPojo.class);
+            AvroSchema.of(SchemaDefinition.<AvroEncodedPojo>builder().
+                    withPojo(AvroEncodedPojo.class).build());
 
         try (Producer<AvroEncodedPojo> producer = pulsarClient
             .newProducer(avroSchema)
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
index e17fb06..d0bff30 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -19,10 +19,10 @@
 package org.apache.pulsar.client.api;
 
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Map;
+
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -169,58 +169,43 @@ public interface Schema<T> {
     }
 
     /**
-     * Create a Avro schema type by extracting the fields of the specified class.
-     *
-     * @param clazz the POJO class to be used to extract the Avro schema
-     * @return a Schema instance
-     */
-    static <T> Schema<T> AVRO(Class<T> clazz) {
-        return DefaultImplementation.newAvroSchema(clazz);
-    }
-
-    /**
-     * Create a Avro schema type using the provided avro schema definition.
+     * Create a  Avro schema type by default configuration of the class
      *
-     * @param schemaDefinition avro schema definition
+     * @param pojo the POJO class to be used to extract the Avro schema
      * @return a Schema instance
      */
-    static <T> Schema<T> AVRO(String schemaDefinition) {
-        return AVRO(schemaDefinition, Collections.emptyMap());
+    static <T> Schema<T> AVRO(Class<T> pojo) {
+        return DefaultImplementation.newAvroSchema(SchemaDefinition.builder().withPojo(pojo).build());
     }
 
     /**
-     * Create a Avro schema type using the provided avro schema definition.
+     * Create a Avro schema type with schema definition
      *
-     * @param schemaDefinition avro schema definition
-     * @param properties pulsar schema properties
+     * @param schemaDefinition the definition of the schema
      * @return a Schema instance
      */
-    static <T> Schema<T> AVRO(String schemaDefinition, Map<String, String> properties) {
-        return DefaultImplementation.newAvroSchema(schemaDefinition, properties);
+    static <T> Schema<T> AVRO(SchemaDefinition<T> schemaDefinition) {
+        return DefaultImplementation.newAvroSchema(schemaDefinition);
     }
 
     /**
      * Create a JSON schema type by extracting the fields of the specified class.
      *
-     * @param clazz the POJO class to be used to extract the JSON schema
+     * @param pojo the POJO class to be used to extract the JSON schema
      * @return a Schema instance
      */
-    static <T> Schema<T> JSON(Class<T> clazz) {
-        return DefaultImplementation.newJSONSchema(clazz);
+    static <T> Schema<T> JSON(Class<T> pojo) {
+        return DefaultImplementation.newJSONSchema(SchemaDefinition.builder().withPojo(pojo).build());
     }
 
     /**
-     * Create a JSON schema type by extracting the fields of the specified class.
+     * Create a JSON schema type with schema definition
      *
-     * @param clazz the POJO class to be used to extract the JSON schema
-     * @param schemaDefinition schema definition json string (using avro schema syntax)
-     * @param properties pulsar schema properties
+     * @param schemaDefinition the definition of the schema
      * @return a Schema instance
      */
-    static <T> Schema<T> JSON(Class<T> clazz,
-                              String schemaDefinition,
-                              Map<String, String> properties) {
-        return DefaultImplementation.newJSONSchema(clazz, schemaDefinition, properties);
+    static <T> Schema<T> JSON(SchemaDefinition schemaDefinition) {
+        return DefaultImplementation.newJSONSchema(schemaDefinition);
     }
 
     /**
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
new file mode 100644
index 0000000..daf90df
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinition.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+import org.apache.pulsar.client.internal.DefaultImplementation;
+
+import java.util.Map;
+
+
+public interface SchemaDefinition<T> {
+
+    /**
+     * Get a new builder instance that can used to configure and build a {@link SchemaDefinition} instance.
+     *
+     * @return the {@link SchemaDefinition}
+     */
+    static  <T> SchemaDefinitionBuilder<T> builder() {
+        return DefaultImplementation.newSchemaDefinitionBuilder();
+    }
+
+    /**
+     * get schema whether always allow null or not
+     *
+     * @return schema always null or not
+     */
+    public boolean getAlwaysAllowNull();
+
+    /**
+     * Get schema class
+     *
+     * @return schema class
+     */
+    public Map<String, String> getProperties();
+
+    /**
+     * Get json schema definition
+     *
+     * @return schema class
+     */
+    public String getJsonDef();
+
+    /**
+     * Get pojo schema definition
+     *
+     * @return pojo schema
+     */
+    public Class<T> getPojo();
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
new file mode 100644
index 0000000..77bb363
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/SchemaDefinitionBuilder.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+
+
+import java.util.Map;
+
+/**
+ * Builder to build schema definition {@link SchemaDefinition}.
+ */
+public interface SchemaDefinitionBuilder<T> {
+
+    /**
+     * Set schema whether always allow null or not
+     *
+     * @param alwaysAllowNull definition null or not
+     * @return schema definition builder
+     */
+    SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull);
+
+    /**
+     * Set schema info properties
+     *
+     * @param properties schema info properties
+     * @return schema definition builder
+     */
+    SchemaDefinitionBuilder<T> withProperties(Map<String, String> properties);
+
+    /**
+     * Set schema info properties
+     *
+     * @param key property key
+     * @param value property value
+     *
+     * @return record schema definition
+     */
+    SchemaDefinitionBuilder<T> addProperty(String key, String value);
+
+    /**
+     * Set schema of pojo definition
+     *
+     * @param pojo pojo schema definition
+     *
+     * @return record schema definition
+     */
+    SchemaDefinitionBuilder<T> withPojo(Class pojo);
+
+    /**
+     * Set schema of json definition
+     *
+     * @param jsonDefinition json schema definition
+     *
+     * @return record schema definition
+     */
+    SchemaDefinitionBuilder<T> withJsonDef(String jsonDefinition);
+
+    /**
+     * Build the schema definition.
+     *
+     * @return the schema definition.
+     */
+    SchemaDefinition<T> build();
+
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index abd368e..44dbc10 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -37,9 +37,7 @@ import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericSchema;
-import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.*;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -71,6 +69,13 @@ public class DefaultImplementation {
     private static final Constructor<Authentication> AUTHENTICATION_TLS_String_String = getConstructor(
             "org.apache.pulsar.client.impl.auth.AuthenticationTls", String.class, String.class);
 
+    private static final Constructor<SchemaDefinitionBuilder> SCHEMA_DEFINITION_BUILDER_CONSTRUCTOR = getConstructor(
+            "org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl");
+
+    public static <T> SchemaDefinitionBuilder<T> newSchemaDefinitionBuilder() {
+        return catchExceptions(() -> (SchemaDefinitionBuilder<T>)SCHEMA_DEFINITION_BUILDER_CONSTRUCTOR.newInstance());
+    }
+
     public static ClientBuilder newClientBuilder() {
         return catchExceptions(() -> CLIENT_BUILDER_IMPL.newInstance());
     }
@@ -182,16 +187,10 @@ public class DefaultImplementation {
                         .newInstance());
     }
 
-    public static <T> Schema<T> newAvroSchema(Class<T> clazz) {
+    public static <T> Schema<T> newAvroSchema(SchemaDefinition schemaDefinition) {
         return catchExceptions(
-                () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.AvroSchema", "of", Class.class)
-                        .invoke(null, clazz));
-    }
-
-    public static <T> Schema<T> newAvroSchema(String schemaDefinition, Map<String, String> properties) {
-        return catchExceptions(
-                () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.AvroSchema", "of", String.class, Map.class)
-                        .invoke(null, schemaDefinition, properties));
+                () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.AvroSchema", "of", SchemaDefinition.class)
+                        .invoke(null,schemaDefinition));
     }
 
     public static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(Class<T> clazz) {
@@ -200,18 +199,10 @@ public class DefaultImplementation {
                         .invoke(null, clazz));
     }
 
-    public static <T> Schema<T> newJSONSchema(Class<T> clazz) {
-        return catchExceptions(
-                () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.JSONSchema", "of", Class.class)
-                        .invoke(null, clazz));
-    }
-
-    public static <T> Schema<T> newJSONSchema(Class<T> clazz,
-                                              String schemaDefinition,
-                                              Map<String, String> properties) {
+    public static <T> Schema<T> newJSONSchema(SchemaDefinition schemaDefinition) {
         return catchExceptions(
-                () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.JSONSchema", "of", Class.class, String.class, Map.class)
-                        .invoke(null, clazz, schemaDefinition, properties));
+                () -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.JSONSchema", "of", SchemaDefinition.class)
+                        .invoke(null, schemaDefinition));
     }
 
     public static Schema<GenericRecord> newAutoConsumeSchema() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
index c9726aa..a00112c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java
@@ -26,11 +26,12 @@ import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -48,12 +49,11 @@ public class AvroSchema<T> extends StructSchema<T> {
             new ThreadLocal<>();
 
     private AvroSchema(org.apache.avro.Schema schema,
-                       Map<String, String> properties) {
+                       SchemaDefinition schemaDefinition) {
         super(
             SchemaType.AVRO,
             schema,
-            properties);
-
+            schemaDefinition.getProperties());
         this.byteArrayOutputStream = new ByteArrayOutputStream();
         this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder);
         this.datumWriter = new ReflectDatumWriter<>(this.schema);
@@ -87,23 +87,23 @@ public class AvroSchema<T> extends StructSchema<T> {
         }
     }
 
-    public static <T> AvroSchema<T> of(Class<T> pojo) {
-        return new AvroSchema<>(createAvroSchema(pojo), Collections.emptyMap());
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return this.schemaInfo;
     }
 
-    public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties) {
-        return new AvroSchema<>(createAvroSchema(pojo), properties);
+    public static <T> AvroSchema<T> of(SchemaDefinition<T> schemaDefinition) {
+        return schemaDefinition.getJsonDef() == null ?
+                new AvroSchema<>(createAvroSchema(schemaDefinition), schemaDefinition) : new AvroSchema<>(parseAvroSchema(schemaDefinition.getJsonDef()), schemaDefinition);
     }
 
-    /**
-     * Create an Avro schema based on provided schema definition.
-     *
-     * @param schemaDefinition avro schema definition
-     * @param properties schema properties
-     * @return avro schema instance
-     */
-    public static <T> AvroSchema<T> of(String schemaDefinition, Map<String, String> properties) {
-        return new AvroSchema<>(parseAvroSchema(schemaDefinition), properties);
+    public static <T> AvroSchema<T> of(Class<T> pojo) {
+        return AvroSchema.of(SchemaDefinition.<T>builder().withPojo(pojo).build());
+    }
+
+    public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties) {
+        SchemaDefinition<T> schemaDefinition = SchemaDefinition.<T>builder().withPojo(pojo).withProperties(properties).build();
+        return new AvroSchema<>(createAvroSchema(schemaDefinition), schemaDefinition);
     }
 
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
index b915ff2..629b769 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/JSONSchema.java
@@ -26,11 +26,11 @@ import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
 import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -38,7 +38,6 @@ import java.util.Map;
  */
 @Slf4j
 public class JSONSchema<T> extends StructSchema<T> {
-
     // Cannot use org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal() because it does not
     // return shaded version of object mapper
     private static final ThreadLocal<ObjectMapper> JSON_MAPPER = ThreadLocal.withInitial(() -> {
@@ -51,14 +50,13 @@ public class JSONSchema<T> extends StructSchema<T> {
     private final Class<T> pojo;
     private final ObjectMapper objectMapper;
 
-    private JSONSchema(Class<T> pojo,
-                       org.apache.avro.Schema schema,
-                       Map<String, String> properties) {
+    private JSONSchema(org.apache.avro.Schema schema,
+                        SchemaDefinition<T> schemaDefinition) {
         super(
             SchemaType.JSON,
             schema,
-            properties);
-        this.pojo = pojo;
+            schemaDefinition.getProperties());
+        this.pojo = schemaDefinition.getPojo();
         this.objectMapper = JSON_MAPPER.get();
     }
 
@@ -89,6 +87,7 @@ public class JSONSchema<T> extends StructSchema<T> {
      * Implemented for backwards compatibility reasons
      * since the original schema generated by JSONSchema was based off the json schema standard
      * since then we have standardized on Avro
+     *
      * @return
      */
     public SchemaInfo getBackwardsCompatibleJsonSchemaInfo() {
@@ -108,25 +107,18 @@ public class JSONSchema<T> extends StructSchema<T> {
         return backwardsCompatibleSchemaInfo;
     }
 
+    public static <T> JSONSchema<T> of(SchemaDefinition<T> schemaDefinition) {
+        String jsonDef = schemaDefinition.getJsonDef();
+            return jsonDef == null ? new JSONSchema<>(createAvroSchema(schemaDefinition), schemaDefinition) :
+                    new JSONSchema<>(parseAvroSchema(jsonDef), schemaDefinition);
+    }
+
     public static <T> JSONSchema<T> of(Class<T> pojo) {
-        return new JSONSchema<>(pojo, createAvroSchema(pojo), Collections.emptyMap());
+        return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(pojo).build());
     }
 
     public static <T> JSONSchema<T> of(Class<T> pojo, Map<String, String> properties) {
-        return new JSONSchema<>(pojo, createAvroSchema(pojo), properties);
+        return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(pojo).withProperties(properties).build());
     }
 
-    /**
-     * Create an json schema based on provided schema definition.
-     *
-     * @param pojo pojo class
-     * @param schemaDefinition avro schema definition
-     * @param properties schema properties
-     * @return avro schema instance
-     */
-    public static <T> JSONSchema<T> of(Class<T> pojo,
-                                       String schemaDefinition,
-                                       Map<String, String> properties) {
-        return new JSONSchema<>(pojo, parseAvroSchema(schemaDefinition), properties);
-    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
new file mode 100644
index 0000000..2db6cf4
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionBuilderImpl.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.api.schema.SchemaDefinitionBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Builder to build {@link org.apache.pulsar.client.api.schema.GenericRecord}.
+ */
+public class SchemaDefinitionBuilderImpl<T> implements SchemaDefinitionBuilder<T> {
+
+    public static final String ALWAYS_ALLOW_NULL = "__alwaysAllowNull";
+
+    /**
+     * the schema definition class
+     */
+    private  Class<T> clazz;
+    /**
+     * The flag of schema type always allow null
+     *
+     * If it's true, will make all of the pojo field generate schema
+     * define default can be null,false default can't be null, but it's
+     * false you can define the field by yourself by the annotation@Nullable
+     *
+     */
+    private boolean alwaysAllowNull = true;
+
+    /**
+     * The schema info properties
+     */
+    private Map<String, String> properties = new HashMap<>();
+
+    /**
+     * The json schema definition
+     */
+    private String jsonDef;
+
+    @Override
+    public SchemaDefinitionBuilder<T> withAlwaysAllowNull(boolean alwaysAllowNull) {
+        this.alwaysAllowNull = alwaysAllowNull;
+        return this;
+    }
+
+    @Override
+    public SchemaDefinitionBuilder<T> addProperty(String key, String value) {
+        this.properties.put(key, value);
+        return this;
+    }
+
+    @Override
+    public SchemaDefinitionBuilder<T> withPojo(Class clazz) {
+        this.clazz = clazz;
+        return this;
+    }
+
+    @Override
+    public SchemaDefinitionBuilder<T> withJsonDef(String jsonDef) {
+        this.jsonDef = jsonDef;
+        return this;
+    }
+
+
+    @Override
+    public SchemaDefinitionBuilder<T> withProperties(Map<String,String> properties) {
+        this.properties = properties;
+        return this;
+    }
+
+    @Override
+    public  SchemaDefinition<T> build() {
+        properties.put(ALWAYS_ALLOW_NULL, this.alwaysAllowNull ? "true" : "false");
+        return new SchemaDefinitionImpl(clazz, jsonDef, alwaysAllowNull, properties);
+
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
new file mode 100644
index 0000000..04f1a24
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/SchemaDefinitionImpl.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.schema;
+
+
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A json schema definition
+ * {@link org.apache.pulsar.client.api.schema.SchemaDefinition} for the json schema definition.
+ */
+public class SchemaDefinitionImpl<T> implements SchemaDefinition<T>{
+
+    /**
+     * the schema definition class
+     */
+    private  Class<T> pojo;
+    /**
+     * The flag of schema type always allow null
+     *
+     * If it's true, will make all of the pojo field generate schema
+     * define default can be null,false default can't be null, but it's
+     * false you can define the field by yourself by the annotation@Nullable
+     *
+     */
+    private boolean alwaysAllowNull;
+
+    private Map<String, String> properties;
+
+    private String jsonDef;
+
+    public SchemaDefinitionImpl(Class<T> pojo, String jsonDef, boolean alwaysAllowNull, Map<String,String> properties) {
+        this.alwaysAllowNull = alwaysAllowNull;
+        this.properties = properties;
+        this.jsonDef = jsonDef;
+        this.pojo = pojo;
+    }
+    /**
+     * get schema whether always allow null or not
+     *
+     * @return schema always null or not
+     */
+    public boolean getAlwaysAllowNull() {
+
+        return alwaysAllowNull;
+    }
+
+    /**
+     * Get json schema definition
+     *
+     * @return schema class
+     */
+    public String getJsonDef() {
+
+        return jsonDef;
+    }
+    /**
+     * Get pojo schema definition
+     *
+     * @return pojo class
+     */
+    @Override
+    public Class<T> getPojo() {
+        return pojo;
+    }
+
+    /**
+     * Get schema class
+     *
+     * @return schema class
+     */
+    public Map<String, String> getProperties() {
+
+        return properties;
+    }
+
+
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
index 33ce9de..31156d4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/StructSchema.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.avro.Schema.Parser;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -62,13 +63,14 @@ abstract class StructSchema<T> implements Schema<T> {
         return this.schemaInfo;
     }
 
-    protected static <T> org.apache.avro.Schema createAvroSchema(Class<T> pojo) {
-        return ReflectData.AllowNull.get().getSchema(pojo);
+    protected static org.apache.avro.Schema createAvroSchema(SchemaDefinition schemaDefinition) {
+        Class pojo = schemaDefinition.getPojo();
+        return schemaDefinition.getAlwaysAllowNull() ? ReflectData.AllowNull.get().getSchema(pojo) : ReflectData.get().getSchema(pojo);
     }
 
-    protected static org.apache.avro.Schema parseAvroSchema(String definition) {
+    protected static org.apache.avro.Schema parseAvroSchema(String jsonDef) {
         Parser parser = new Parser();
-        return parser.parse(definition);
+        return parser.parse(jsonDef);
     }
 
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
index 203c226..bbd753b 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AvroSchemaTest.java
@@ -20,29 +20,34 @@ package org.apache.pulsar.client.impl.schema;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.FOO_FIELDS;
-import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON;
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_AVRO_NOT_ALLOW_NULL;
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_AVRO_ALLOW_NULL;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.fail;
 
 import java.util.Arrays;
-import java.util.Collections;
+
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.avro.Schema;
+import org.apache.pulsar.client.api.SchemaSerializationException;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.avro.SchemaValidationException;
 import org.apache.avro.SchemaValidator;
 import org.apache.avro.SchemaValidatorBuilder;
 import org.apache.avro.reflect.AvroDefault;
 import org.apache.avro.reflect.Nullable;
 import org.apache.avro.reflect.ReflectData;
-import org.apache.pulsar.client.api.SchemaSerializationException;
+
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+
 @Slf4j
 public class AvroSchemaTest {
 
@@ -88,7 +93,7 @@ public class AvroSchemaTest {
             // expected
         }
 
-        AvroSchema<StructWithAnnotations> schema3 = AvroSchema.of(schemaDef1, Collections.emptyMap());
+        AvroSchema<StructWithAnnotations> schema3 = AvroSchema.of(SchemaDefinition.<StructWithAnnotations>builder().withJsonDef(schemaDef1).build());
         String schemaDef3 = new String(schema3.getSchemaInfo().getSchema(), UTF_8);
         assertEquals(schemaDef1, schemaDef3);
         assertNotEquals(schemaDef2, schemaDef3);
@@ -108,12 +113,12 @@ public class AvroSchemaTest {
     }
 
     @Test
-    public void testSchema() {
-        AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class);
+    public void testNotAllowNullSchema() {
+        AvroSchema<Foo> avroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
         assertEquals(avroSchema.getSchemaInfo().getType(), SchemaType.AVRO);
         Schema.Parser parser = new Schema.Parser();
         String schemaJson = new String(avroSchema.getSchemaInfo().getSchema());
-        assertEquals(schemaJson, SCHEMA_JSON);
+        assertEquals(schemaJson, SCHEMA_AVRO_NOT_ALLOW_NULL);
         Schema schema = parser.parse(schemaJson);
 
         for (String fieldName : FOO_FIELDS) {
@@ -123,12 +128,66 @@ public class AvroSchemaTest {
             if (field.name().equals("field4")) {
                 Assert.assertNotNull(field.schema().getTypes().get(1).getField("field1"));
             }
+            if (field.name().equals("fieldUnableNull")) {
+                Assert.assertNotNull(field.schema().getType());
+            }
         }
     }
 
     @Test
-    public void testEncodeAndDecode() {
-        AvroSchema<Foo> avroSchema = AvroSchema.of(Foo.class, null);
+    public void testAllowNullSchema() {
+        AvroSchema<Foo> avroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        assertEquals(avroSchema.getSchemaInfo().getType(), SchemaType.AVRO);
+        Schema.Parser parser = new Schema.Parser();
+        String schemaJson = new String(avroSchema.getSchemaInfo().getSchema());
+        assertEquals(schemaJson, SCHEMA_AVRO_ALLOW_NULL);
+        Schema schema = parser.parse(schemaJson);
+
+        for (String fieldName : FOO_FIELDS) {
+            Schema.Field field = schema.getField(fieldName);
+            Assert.assertNotNull(field);
+
+            if (field.name().equals("field4")) {
+                Assert.assertNotNull(field.schema().getTypes().get(1).getField("field1"));
+            }
+            if (field.name().equals("fieldUnableNull")) {
+                Assert.assertNotNull(field.schema().getType());
+            }
+        }
+    }
+
+    @Test
+    public void testNotAllowNullEncodeAndDecode() {
+        AvroSchema<Foo> avroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
+
+        Foo foo1 = new Foo();
+        foo1.setField1("foo1");
+        foo1.setField2("bar1");
+        foo1.setField4(new Bar());
+        foo1.setFieldUnableNull("notNull");
+
+        Foo foo2 = new Foo();
+        foo2.setField1("foo2");
+        foo2.setField2("bar2");
+
+        byte[] bytes1 = avroSchema.encode(foo1);
+        Foo object1 = avroSchema.decode(bytes1);
+        Assert.assertTrue(bytes1.length > 0);
+        assertEquals(object1, foo1);
+
+        try {
+
+            avroSchema.encode(foo2);
+
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof SchemaSerializationException);
+        }
+
+    }
+
+    @Test
+    public void testAllowNullEncodeAndDecode() {
+        AvroSchema<Foo> avroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
 
         Foo foo1 = new Foo();
         foo1.setField1("foo1");
@@ -150,6 +209,8 @@ public class AvroSchemaTest {
 
         assertEquals(object1, foo1);
         assertEquals(object2, foo2);
+
     }
 
+
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
index 9184faa..5efb82b 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/JSONSchemaTest.java
@@ -24,7 +24,7 @@ import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
-import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.DerivedFoo;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
@@ -35,18 +35,20 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.FOO_FIELDS;
-import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON;
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_NOT_ALLOW_NULL;
+import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_ALLOW_NULL;
+import static org.testng.Assert.assertEquals;
 
 @Slf4j
 public class JSONSchemaTest {
 
     @Test
-    public void testSchema() {
-        JSONSchema<Foo> jsonSchema = JSONSchema.of(Foo.class);
+    public void testNotAllowNullSchema() {
+        JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
         Assert.assertEquals(jsonSchema.getSchemaInfo().getType(), SchemaType.JSON);
         Schema.Parser parser = new Schema.Parser();
         String schemaJson = new String(jsonSchema.getSchemaInfo().getSchema());
-        Assert.assertEquals(schemaJson, SCHEMA_JSON);
+        Assert.assertEquals(schemaJson, SCHEMA_JSON_NOT_ALLOW_NULL);
         Schema schema = parser.parse(schemaJson);
 
         for (String fieldName : FOO_FIELDS) {
@@ -56,12 +58,37 @@ public class JSONSchemaTest {
             if (field.name().equals("field4")) {
                 Assert.assertNotNull(field.schema().getTypes().get(1).getField("field1"));
             }
+            if (field.name().equals("fieldUnableNull")) {
+                Assert.assertNotNull(field.schema().getType());
+            }
         }
     }
 
     @Test
-    public void testEncodeAndDecode() {
-        JSONSchema<Foo> jsonSchema = JSONSchema.of(Foo.class, null);
+    public void testAllowNullSchema() {
+        JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        Assert.assertEquals(jsonSchema.getSchemaInfo().getType(), SchemaType.JSON);
+        Schema.Parser parser = new Schema.Parser();
+        String schemaJson = new String(jsonSchema.getSchemaInfo().getSchema());
+        Assert.assertEquals(schemaJson, SCHEMA_JSON_ALLOW_NULL);
+        Schema schema = parser.parse(schemaJson);
+
+        for (String fieldName : FOO_FIELDS) {
+            Schema.Field field = schema.getField(fieldName);
+            Assert.assertNotNull(field);
+
+            if (field.name().equals("field4")) {
+                Assert.assertNotNull(field.schema().getTypes().get(1).getField("field1"));
+            }
+            if (field.name().equals("fieldUnableNull")) {
+                Assert.assertNotNull(field.schema().getType());
+            }
+        }
+    }
+
+    @Test
+    public void testAllowNullEncodeAndDecode() {
+        JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
 
         Bar bar = new Bar();
         bar.setField1(true);
@@ -90,9 +117,65 @@ public class JSONSchemaTest {
     }
 
     @Test
-    public void testNestedClasses() {
-        JSONSchema<NestedBar> jsonSchema = JSONSchema.of(NestedBar.class, null);
-        JSONSchema<NestedBarList> listJsonSchema = JSONSchema.of(NestedBarList.class, null);
+    public void testNotAllowNullEncodeAndDecode() {
+        JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
+
+        Foo foo1 = new Foo();
+        foo1.setField1("foo1");
+        foo1.setField2("bar1");
+        foo1.setField4(new Bar());
+        foo1.setFieldUnableNull("notNull");
+
+        Foo foo2 = new Foo();
+        foo2.setField1("foo2");
+        foo2.setField2("bar2");
+
+        byte[] bytes1 = jsonSchema.encode(foo1);
+        Foo object1 = jsonSchema.decode(bytes1);
+        Assert.assertTrue(bytes1.length > 0);
+        assertEquals(object1, foo1);
+
+        try {
+
+            jsonSchema.encode(foo2);
+
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof SchemaSerializationException);
+        }
+
+    }
+
+    @Test
+    public void testAllowNullNestedClasses() {
+        JSONSchema<NestedBar> jsonSchema = JSONSchema.of(SchemaDefinition.<NestedBar>builder().withPojo(NestedBar.class).build());
+        JSONSchema<NestedBarList> listJsonSchema = JSONSchema.of(SchemaDefinition.<NestedBarList>builder().withPojo(NestedBarList.class).build());
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        NestedBar nested = new NestedBar();
+        nested.setField1(true);
+        nested.setNested(bar);
+
+        byte[] bytes = jsonSchema.encode(nested);
+        Assert.assertTrue(bytes.length > 0);
+        Assert.assertEquals(jsonSchema.decode(bytes), nested);
+
+        List<Bar> list = Collections.singletonList(bar);
+        NestedBarList nestedList = new NestedBarList();
+        nestedList.setField1(true);
+        nestedList.setList(list);
+
+        bytes = listJsonSchema.encode(nestedList);
+        Assert.assertTrue(bytes.length > 0);
+
+        Assert.assertEquals(listJsonSchema.decode(bytes), nestedList);
+    }
+
+    @Test
+    public void testNotAllowNullNestedClasses() {
+        JSONSchema<NestedBar> jsonSchema = JSONSchema.of(SchemaDefinition.<NestedBar>builder().withPojo(NestedBar.class).withAlwaysAllowNull(false).build());
+        JSONSchema<NestedBarList> listJsonSchema = JSONSchema.of(SchemaDefinition.<NestedBarList>builder().withPojo(NestedBarList.class).withAlwaysAllowNull(false).build());
 
         Bar bar = new Bar();
         bar.setField1(true);
@@ -117,7 +200,59 @@ public class JSONSchemaTest {
     }
 
     @Test
-    public void testCorrectPolymorphism() {
+    public void testNotAllowNullCorrectPolymorphism() {
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        DerivedFoo derivedFoo = new DerivedFoo();
+        derivedFoo.setField1("foo1");
+        derivedFoo.setField2("bar2");
+        derivedFoo.setField3(4);
+        derivedFoo.setField4(bar);
+        derivedFoo.setField5("derived1");
+        derivedFoo.setField6(2);
+
+        Foo foo = new Foo();
+        foo.setField1("foo1");
+        foo.setField2("bar2");
+        foo.setField3(4);
+        foo.setField4(bar);
+
+        SchemaTestUtils.DerivedDerivedFoo derivedDerivedFoo = new SchemaTestUtils.DerivedDerivedFoo();
+        derivedDerivedFoo.setField1("foo1");
+        derivedDerivedFoo.setField2("bar2");
+        derivedDerivedFoo.setField3(4);
+        derivedDerivedFoo.setField4(bar);
+        derivedDerivedFoo.setField5("derived1");
+        derivedDerivedFoo.setField6(2);
+        derivedDerivedFoo.setFoo2(foo);
+        derivedDerivedFoo.setDerivedFoo(derivedFoo);
+
+        // schema for base class
+        JSONSchema<Foo> baseJsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(foo)), foo);
+        Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(derivedFoo)), foo);
+        Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(derivedDerivedFoo)), foo);
+
+        // schema for derived class
+        JSONSchema<DerivedFoo> derivedJsonSchema = JSONSchema.of(SchemaDefinition.<DerivedFoo>builder().withPojo(DerivedFoo.class).build());
+        Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedFoo)), derivedFoo);
+        Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedDerivedFoo)), derivedFoo);
+
+        //schema for derived derived class
+        JSONSchema<SchemaTestUtils.DerivedDerivedFoo> derivedDerivedJsonSchema
+                = JSONSchema.of(SchemaDefinition.<SchemaTestUtils.DerivedDerivedFoo>builder().withPojo(SchemaTestUtils.DerivedDerivedFoo.class).build());
+        Assert.assertEquals(derivedDerivedJsonSchema.decode(derivedDerivedJsonSchema.encode(derivedDerivedFoo)), derivedDerivedFoo);
+    }
+
+    @Test(expectedExceptions = SchemaSerializationException.class)
+    public void testAllowNullDecodeWithInvalidContent() {
+        JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        jsonSchema.decode(new byte[0]);
+    }
+
+    @Test
+    public void testAllowNullCorrectPolymorphism() {
         Bar bar = new Bar();
         bar.setField1(true);
 
@@ -146,25 +281,25 @@ public class JSONSchemaTest {
         derivedDerivedFoo.setDerivedFoo(derivedFoo);
 
         // schema for base class
-        JSONSchema<Foo> baseJsonSchema = JSONSchema.of(Foo.class);
+        JSONSchema<Foo> baseJsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
         Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(foo)), foo);
         Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(derivedFoo)), foo);
         Assert.assertEquals(baseJsonSchema.decode(baseJsonSchema.encode(derivedDerivedFoo)), foo);
 
         // schema for derived class
-        JSONSchema<DerivedFoo> derivedJsonSchema = JSONSchema.of(DerivedFoo.class);
+        JSONSchema<DerivedFoo> derivedJsonSchema = JSONSchema.of(SchemaDefinition.<DerivedFoo>builder().withPojo(DerivedFoo.class).withAlwaysAllowNull(false).build());
         Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedFoo)), derivedFoo);
         Assert.assertEquals(derivedJsonSchema.decode(derivedJsonSchema.encode(derivedDerivedFoo)), derivedFoo);
 
         //schema for derived derived class
         JSONSchema<SchemaTestUtils.DerivedDerivedFoo> derivedDerivedJsonSchema
-                = JSONSchema.of(SchemaTestUtils.DerivedDerivedFoo.class);
+                = JSONSchema.of(SchemaDefinition.<SchemaTestUtils.DerivedDerivedFoo>builder().withPojo(SchemaTestUtils.DerivedDerivedFoo.class).withAlwaysAllowNull(false).build());
         Assert.assertEquals(derivedDerivedJsonSchema.decode(derivedDerivedJsonSchema.encode(derivedDerivedFoo)), derivedDerivedFoo);
     }
 
     @Test(expectedExceptions = SchemaSerializationException.class)
-    public void testDecodeWithInvalidContent() {
-        JSONSchema<Foo> jsonSchema = JSONSchema.of(Foo.class);
+    public void testNotAllowNullDecodeWithInvalidContent() {
+        JSONSchema<Foo> jsonSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
         jsonSchema.decode(new byte[0]);
     }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index 2963d62..a32e31d 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -22,9 +22,7 @@ import static org.testng.Assert.assertEquals;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.JSONSchema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Color;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
@@ -37,9 +35,9 @@ import org.testng.annotations.Test;
 public class KeyValueSchemaTest {
 
     @Test
-    public void testAvroSchemaCreate() {
-        AvroSchema<Foo> fooSchema = AvroSchema.of(Foo.class);
-        AvroSchema<Bar> barSchema = AvroSchema.of(Bar.class);
+    public void testAllowNullAvroSchemaCreate() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
 
         Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
         Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(Foo.class, Bar.class, SchemaType.AVRO);
@@ -47,13 +45,39 @@ public class KeyValueSchemaTest {
         assertEquals(keyValueSchema1.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
         assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
 
-        assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+                SchemaType.AVRO);
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+                SchemaType.AVRO);
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+                SchemaType.AVRO);
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+                SchemaType.AVRO);
+
+        String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
+        String schemaInfo2 = new String(keyValueSchema2.getSchemaInfo().getSchema());
+        assertEquals(schemaInfo1, schemaInfo2);
+    }
+
+    @Test
+    public void testNotAllowNullAvroSchemaCreate() {
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build());
+
+        Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
+        Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()),
+                AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build()));
+
+        assertEquals(keyValueSchema1.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
+        assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
+
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
             SchemaType.AVRO);
-        assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
             SchemaType.AVRO);
-        assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
             SchemaType.AVRO);
-        assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
             SchemaType.AVRO);
 
         String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
@@ -62,9 +86,9 @@ public class KeyValueSchemaTest {
     }
 
     @Test
-    public void testJsonSchemaCreate() {
-        JSONSchema<Foo> fooSchema = JSONSchema.of(Foo.class);
-        JSONSchema<Bar> barSchema = JSONSchema.of(Bar.class);
+    public void testAllowNullJsonSchemaCreate() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
 
         Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
         Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(Foo.class, Bar.class, SchemaType.JSON);
@@ -74,17 +98,53 @@ public class KeyValueSchemaTest {
         assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
         assertEquals(keyValueSchema3.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
 
-        assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+                SchemaType.JSON);
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+                SchemaType.JSON);
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+                SchemaType.JSON);
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+                SchemaType.JSON);
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
+                SchemaType.JSON);
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
+                SchemaType.JSON);
+
+        String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
+        String schemaInfo2 = new String(keyValueSchema2.getSchemaInfo().getSchema());
+        String schemaInfo3 = new String(keyValueSchema3.getSchemaInfo().getSchema());
+        assertEquals(schemaInfo1, schemaInfo2);
+        assertEquals(schemaInfo1, schemaInfo3);
+    }
+
+    @Test
+    public void testNotAllowNullJsonSchemaCreate() {
+        JSONSchema<Foo> fooSchema = JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
+        JSONSchema<Bar> barSchema = JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build());
+
+        Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
+        Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()),
+                JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build()));
+
+        Schema<KeyValue<Foo, Bar>> keyValueSchema3 = Schema.KeyValue(JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()),
+                JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build()));
+
+        assertEquals(keyValueSchema1.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
+        assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
+        assertEquals(keyValueSchema3.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
+
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
             SchemaType.JSON);
-        assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
             SchemaType.JSON);
-        assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
             SchemaType.JSON);
-        assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
             SchemaType.JSON);
-        assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
             SchemaType.JSON);
-        assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
             SchemaType.JSON);
 
         String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
@@ -95,7 +155,7 @@ public class KeyValueSchemaTest {
     }
 
     @Test
-    public void testSchemaEncodeAndDecode() {
+    public void testAllowNullSchemaEncodeAndDecode() {
         Schema keyValueSchema = Schema.KeyValue(Foo.class, Bar.class);
 
         Bar bar = new Bar();
@@ -111,7 +171,7 @@ public class KeyValueSchemaTest {
         byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
         Assert.assertTrue(encodeBytes.length > 0);
 
-        KeyValue<Foo, Bar> keyValue = (KeyValue<Foo, Bar>)keyValueSchema.decode(encodeBytes);
+        KeyValue<Foo, Bar> keyValue = (KeyValue<Foo, Bar>) keyValueSchema.decode(encodeBytes);
         Foo fooBack = keyValue.getKey();
         Bar barBack = keyValue.getValue();
 
@@ -120,9 +180,64 @@ public class KeyValueSchemaTest {
     }
 
     @Test
-    public void testBytesSchemaEncodeAndDecode() {
-        AvroSchema<Foo> fooAvroSchema = AvroSchema.of(Foo.class);
-        AvroSchema<Bar> barAvroSchema = AvroSchema.of(Bar.class);
+    public void testNotAllowNullSchemaEncodeAndDecode() {
+        Schema keyValueSchema = Schema.KeyValue(JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build()),
+                JSONSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build()));
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        Foo foo = new Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        foo.setField4(bar);
+        foo.setColor(Color.RED);
+
+        byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
+        Assert.assertTrue(encodeBytes.length > 0);
+
+        KeyValue<Foo, Bar> keyValue = (KeyValue<Foo, Bar>) keyValueSchema.decode(encodeBytes);
+        Foo fooBack = keyValue.getKey();
+        Bar barBack = keyValue.getValue();
+
+        assertEquals(foo, fooBack);
+        assertEquals(bar, barBack);
+    }
+
+    @Test
+    public void testAllowNullBytesSchemaEncodeAndDecode() {
+        AvroSchema<Foo> fooAvroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        AvroSchema<Bar> barAvroSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        Foo foo = new Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+        foo.setField4(bar);
+        foo.setColor(Color.RED);
+        foo.setFieldUnableNull("notNull");
+
+        byte[] fooBytes = fooAvroSchema.encode(foo);
+        byte[] barBytes = barAvroSchema.encode(bar);
+
+        byte[] encodeBytes = Schema.KV_BYTES().encode(new KeyValue<>(fooBytes, barBytes));
+        KeyValue<byte[], byte[]> decodeKV = Schema.KV_BYTES().decode(encodeBytes);
+
+        Foo fooBack = fooAvroSchema.decode(decodeKV.getKey());
+        Bar barBack = barAvroSchema.decode(decodeKV.getValue());
+
+        assertEquals(foo, fooBack);
+        assertEquals(bar, barBack);
+    }
+
+    @Test
+    public void testNotAllowNullBytesSchemaEncodeAndDecode() {
+        AvroSchema<Foo> fooAvroSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).withAlwaysAllowNull(false).build());
+        AvroSchema<Bar> barAvroSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).withAlwaysAllowNull(false).build());
 
         Bar bar = new Bar();
         bar.setField1(true);
@@ -133,6 +248,7 @@ public class KeyValueSchemaTest {
         foo.setField3(3);
         foo.setField4(bar);
         foo.setColor(Color.RED);
+        foo.setFieldUnableNull("notNull");
 
         byte[] fooBytes = fooAvroSchema.encode(foo);
         byte[] barBytes = barAvroSchema.encode(bar);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
index 4bc0c04..456416f 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaBuilderTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl.schema;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.testng.Assert.assertEquals;
 
+import org.apache.avro.reflect.Nullable;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
@@ -36,11 +37,17 @@ import org.testng.annotations.Test;
 public class SchemaBuilderTest {
 
     private static class AllOptionalFields {
+        @Nullable
         private Integer intField;
+        @Nullable
         private Long longField;
+        @Nullable
         private String stringField;
+        @Nullable
         private Boolean boolField;
+        @Nullable
         private Float floatField;
+        @Nullable
         private Double doubleField;
     }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java
index 0081f0e..98d5367 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaTestUtils.java
@@ -23,6 +23,8 @@ import java.util.List;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
+import org.apache.avro.reflect.AvroDefault;
+import org.apache.avro.reflect.Nullable;
 
 /**
  * Utils for testing avro.
@@ -33,11 +35,17 @@ public class SchemaTestUtils {
     @ToString
     @EqualsAndHashCode
     public static class Foo {
+        @Nullable
         private String field1;
+        @Nullable
         private String field2;
         private int field3;
+        @Nullable
         private Bar field4;
+        @Nullable
         private Color color;
+        @AvroDefault("\"defaultValue\"")
+        private String fieldUnableNull;
     }
 
     @Data
@@ -87,21 +95,35 @@ public class SchemaTestUtils {
         private Foo foo2;
     }
 
-    public static final String SCHEMA_JSON
-            = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema" +
-            ".SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],\"default\":null}," +
-            "{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\"," +
-            "\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Bar\"," +
-            "\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\"," +
-            "\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\",\"symbols\":[\"RED\",\"BLUE\"]}]," +
-            "\"default\":null}]}";
+    public static final String SCHEMA_AVRO_NOT_ALLOW_NULL
+            = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"]," +
+            "\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":" +
+            "\"record\",\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\"," +
+            "\"symbols\":[\"RED\",\"BLUE\"]}],\"default\":null},{\"name\":\"fieldUnableNull\",\"type\":\"string\",\"default\":\"defaultValue\"}]}";
+
+    public static final String SCHEMA_AVRO_ALLOW_NULL = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\"," +
+            "\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"" +
+            "null\",{\"type\":\"record\",\"name\":\"Bar\",\"fields\":[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\"" +
+            ",\"symbols\":[\"RED\",\"BLUE\"]}],\"default\":null},{\"name\":\"fieldUnableNull\",\"type\":[\"null\",\"string\"],\"default\":\"defaultValue\"}]}";
+
+    public static final String SCHEMA_JSON_NOT_ALLOW_NULL
+            = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\"" +
+            ":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Bar\",\"fields\":[{\"name\":\"" +
+            "field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\",\"symbols\":[\"RED\",\"BLUE\"]}],\"default\":null},{\"name\":\"fieldUnableNull\"," +
+            "\"type\":\"string\",\"default\":\"defaultValue\"}]}";
+    public static final String SCHEMA_JSON_ALLOW_NULL
+            = "{\"type\":\"record\",\"name\":\"Foo\",\"namespace\":\"org.apache.pulsar.client.impl.schema.SchemaTestUtils$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\",\"string\"],\"default\":null}," +
+            "{\"name\":\"field2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"field3\",\"type\":\"int\"},{\"name\":\"field4\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Bar\",\"fields\":" +
+            "[{\"name\":\"field1\",\"type\":\"boolean\"}]}],\"default\":null},{\"name\":\"color\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"Color\",\"symbols\":[\"RED\",\"BLUE\"]}],\"default\":null},{\"name\":" +
+            "\"fieldUnableNull\",\"type\":[\"null\",\"string\"],\"default\":\"defaultValue\"}]}";
 
     public static String[] FOO_FIELDS = {
             "field1",
             "field2",
             "field3",
             "field4",
-            "color"
+            "color",
+            "fieldUnableNull"
     };
 
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
index fc65d70..ed554cc 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
@@ -77,7 +77,7 @@ public class GenericSchemaImplTest {
             Bar bar = new Bar();
             bar.setField1(i % 2 == 0);
             foo.setField4(bar);
-
+            foo.setFieldUnableNull("fieldUnableNull-1-" + i);
             byte[] data = encodeSchema.encode(foo);
 
             log.info("Decoding : {}", new String(data, UTF_8));
@@ -93,6 +93,8 @@ public class GenericSchemaImplTest {
             assertTrue(field4 instanceof GenericRecord);
             GenericRecord field4Record = (GenericRecord) field4;
             assertEquals(i % 2 == 0, field4Record.getField("field1"));
+            Object fieldUnableNull = record.getField("fieldUnableNull");
+            assertEquals("fieldUnableNull-1-" + i, fieldUnableNull, "fieldUnableNull 1 is " + fieldUnableNull.getClass());
         }
     }
 
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java
index 884a674..aedc0bd 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java
@@ -24,10 +24,12 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
 import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils;
+import org.apache.pulsar.client.tutorial.JsonPojo;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.testng.annotations.BeforeMethod;
@@ -54,7 +56,7 @@ public class MultiVersionGenericSchemaProviderTest {
     @Test
     public void testGetSchema() {
         CompletableFuture<Optional<SchemaInfo>> completableFuture = new CompletableFuture<>();
-        SchemaInfo schemaInfo = AvroSchema.of(SchemaTestUtils.Foo.class).getSchemaInfo();
+        SchemaInfo schemaInfo = AvroSchema.of(SchemaDefinition.<SchemaTestUtils>builder().withPojo(SchemaTestUtils.class).build()).getSchemaInfo();
         completableFuture.complete(Optional.of(schemaInfo));
         when(schemaProvider.getPulsarClient().getLookup()
                 .getSchema(
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducerWithSchema.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducerWithSchema.java
index 90758b6..c838244 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducerWithSchema.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleAsyncProducerWithSchema.java
@@ -27,15 +27,15 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
-
 @Slf4j
 public class SampleAsyncProducerWithSchema {
 
     public static void main(String[] args) throws IOException {
         PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://localhost:8080").build();
 
-        Producer<JsonPojo> producer = pulsarClient.newProducer(JSONSchema.of(JsonPojo.class)).topic("persistent://my-property/use/my-ns/my-topic")
+        Producer<JsonPojo> producer = pulsarClient.newProducer(JSONSchema.of(SchemaDefinition.<JsonPojo>builder().withPojo(JsonPojo.class).build())).topic("persistent://my-property/use/my-ns/my-topic")
                 .sendTimeout(3, TimeUnit.SECONDS).create();
 
         List<CompletableFuture<MessageId>> futures = Lists.newArrayList();
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumerWithSchema.java b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumerWithSchema.java
index 3780332..1708b84 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumerWithSchema.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/tutorial/SampleConsumerWithSchema.java
@@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 
 public class SampleConsumerWithSchema {
@@ -30,7 +31,8 @@ public class SampleConsumerWithSchema {
 
         PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("http://localhost:8080").build();
 
-        Consumer<JsonPojo> consumer = pulsarClient.newConsumer(JSONSchema.of(JsonPojo.class)) //
+        Consumer<JsonPojo> consumer = pulsarClient.newConsumer(JSONSchema.of
+                (SchemaDefinition.<JsonPojo>builder().withPojo(JsonPojo.class).build())) //
                 .topic("persistent://my-property/use/my-ns/my-topic") //
                 .subscriptionName("my-subscription-name").subscribe();
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
index d8abcc6..2ac42f4 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/TopicSchema.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
@@ -124,10 +125,10 @@ public class TopicSchema {
             return (Schema<T>) Schema.STRING;
 
         case AVRO:
-            return AvroSchema.of(clazz);
+            return AvroSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());
 
         case JSON:
-            return JSONSchema.of(clazz);
+            return JSONSchema.of(SchemaDefinition.<T>builder().withPojo(clazz).build());
 
         case KEY_VALUE:
             return (Schema<T>)Schema.KV_BYTES();
diff --git a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
index 0478775..984f658 100644
--- a/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
+++ b/pulsar-io/hbase/src/test/java/org/apache/pulsar/io/hbase/sink/HbaseGenericRecordSinkTest.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -108,7 +109,7 @@ public class HbaseGenericRecordSinkTest {
         obj.setAddress("address_value");
         obj.setAge(30);
         obj.setFlag(true);
-        AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+        AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
 
         byte[] bytes = schema.encode(obj);
         ByteBuf payload = Unpooled.copiedBuffer(bytes);
diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index 7cbaa57..9ecc91a 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -29,6 +29,7 @@ import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
@@ -95,7 +96,7 @@ public class JdbcSinkTest {
         obj.setField1("ValueOfField1");
         obj.setField2("ValueOfField1");
         obj.setField3(3);
-        AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+        AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
 
         byte[] bytes = schema.encode(obj);
         ByteBuf payload = Unpooled.copiedBuffer(bytes);
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
index b6eade2..7a62f67 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnector.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.admin.Schemas;
 import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.client.admin.Topics;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.api.Commands;
@@ -219,21 +220,21 @@ public abstract class TestPulsarConnector {
             partitionedTopicsToPartitions.put(PARTITIONED_TOPIC_6.toString(), 7);
 
             topicsToSchemas = new HashMap<>();
-            topicsToSchemas.put(TOPIC_1.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-            topicsToSchemas.put(TOPIC_2.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-            topicsToSchemas.put(TOPIC_3.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-            topicsToSchemas.put(TOPIC_4.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-            topicsToSchemas.put(TOPIC_5.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-            topicsToSchemas.put(TOPIC_6.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_1.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_2.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_3.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_4.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_5.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+            topicsToSchemas.put(TOPIC_6.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
 
 
-            topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_1.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
 
-            topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-            topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), AvroSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-            topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-            topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
-            topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), JSONSchema.of(TestPulsarMetadata.Foo.class).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_2.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_3.getSchemaName(), AvroSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_4.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_5.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
+            topicsToSchemas.put(PARTITIONED_TOPIC_6.getSchemaName(), JSONSchema.of(SchemaDefinition.builder().withPojo(TestPulsarMetadata.Foo.class).build()).getSchemaInfo());
 
             fooTypes = new HashMap<>();
             fooTypes.put("field1", IntegerType.INTEGER);
@@ -622,7 +623,7 @@ public abstract class TestPulsarConnector {
                     .setProducerName("test-producer").setSequenceId(i)
                     .setPublishTime(currentTimeMs + i).build();
 
-            Schema schema = topicsToSchemas.get(topicSchemaName).getType() == SchemaType.AVRO ? AvroSchema.of(Foo.class) : JSONSchema.of(Foo.class);
+            Schema schema = topicsToSchemas.get(topicSchemaName).getType() == SchemaType.AVRO ? AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build()) : JSONSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
 
             org.apache.pulsar.shade.io.netty.buffer.ByteBuf payload = org.apache.pulsar.shade.io.netty.buffer.Unpooled
                     .copiedBuffer(schema.encode(foo));
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
index 72d9b01..da4331a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -33,6 +33,7 @@ import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.containers.MySQLContainer;
@@ -60,7 +61,7 @@ public class JdbcSinkTester extends SinkTester<MySQLContainer> {
     private static final String NAME = "jdbc";
     private static final String MYSQL = "mysql";
 
-    private AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
+    private AvroSchema<Foo> schema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
     private String tableName = "test";
     private Connection connection;