You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2021/11/09 13:39:49 UTC

[pulsar] branch branch-2.9 updated (a8a207b -> f849d46)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a change to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from a8a207b  Fix call sync method in an async callback when enabling geo replicator. (#12590)
     new 4939204  Pulsar Client: restore SchemaInfo.builder() API (#12673)
     new fff54d5  [Transaction] Fix close pulsarClient then close transaction client connection (#12689)
     new f849d46  [Java Client] Remove invalid call to Thread.currentThread().interrupt(); (#12652)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../admin/AdminApiSchemaValidationEnforced.java    |  8 ++--
 .../schema/JsonSchemaCompatibilityCheckTest.java   |  3 +-
 ...Test.java => TransactionClientConnectTest.java} | 29 ++++++++++++-
 .../broker/transaction/TransactionTestBase.java    |  2 +-
 .../SchemaCompatibilityCheckTest.java              |  5 +--
 .../pulsar/client/admin/internal/SchemasImpl.java  |  3 +-
 .../PulsarClientImplementationBinding.java         |  2 +
 .../apache/pulsar/common/schema/SchemaInfo.java    | 48 ++++++++++++++++++++++
 .../pulsar/client/impl/ConnectionHandler.java      | 16 +++++---
 .../apache/pulsar/client/impl/HandlerState.java    |  7 ++++
 .../apache/pulsar/client/impl/ProducerImpl.java    |  1 -
 .../pulsar/client/impl/PulsarClientImpl.java       | 16 ++++----
 .../PulsarClientImplementationBindingImpl.java     | 34 +++------------
 .../client/impl/TransactionMetaStoreHandler.java   |  7 ++++
 .../pulsar/client/impl/schema/SchemaInfoTest.java  |  6 +--
 .../pulsar/common/protocol/schema/SchemaData.java  |  3 +-
 .../connect/schema/KafkaSchemaWrappedSchema.java   |  3 +-
 .../apache/pulsar/io/kafka/AvroSchemaCache.java    |  3 +-
 .../org/apache/pulsar/sql/presto/PulsarSplit.java  |  3 +-
 .../pulsar/sql/presto/TestPulsarMetadata.java      |  4 +-
 .../decoder/primitive/TestPrimitiveDecoder.java    | 24 +++++------
 21 files changed, 143 insertions(+), 84 deletions(-)
 rename pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/{TransactionClientReconnectTest.java => TransactionClientConnectTest.java} (89%)

[pulsar] 02/03: [Transaction] Fix close pulsarClient then close transaction client connection (#12689)

Posted by eo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fff54d5593471bf38de915d9f72bae5e988226a7
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Tue Nov 9 19:34:14 2021 +0800

    [Transaction] Fix close pulsarClient then close transaction client connection (#12689)
    
    (cherry picked from commit 6162ccf34aa29e9495bf2f0cdc7e88e9e8c1d067)
---
 ...Test.java => TransactionClientConnectTest.java} | 29 +++++++++++++++++++++-
 .../broker/transaction/TransactionTestBase.java    |  2 +-
 .../pulsar/client/impl/ConnectionHandler.java      | 16 +++++++-----
 .../apache/pulsar/client/impl/HandlerState.java    |  7 ++++++
 .../pulsar/client/impl/PulsarClientImpl.java       | 16 ++++++------
 .../client/impl/TransactionMetaStoreHandler.java   |  7 ++++++
 6 files changed, 61 insertions(+), 16 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
similarity index 89%
rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
index 1f5ab15..42eadfe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientReconnectTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionClientConnectTest.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
 import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
@@ -39,6 +40,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -47,7 +49,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.FileAssert.fail;
 
-public class TransactionClientReconnectTest extends TransactionTestBase {
+public class TransactionClientConnectTest extends TransactionTestBase {
 
     private static final String RECONNECT_TOPIC = "persistent://public/txn/txn-client-reconnect-test";
     private static final int NUM_PARTITIONS = 1;
@@ -223,6 +225,31 @@ public class TransactionClientReconnectTest extends TransactionTestBase {
         reconnect();
     }
 
+    @Test
+    public void testPulsarClientCloseThenCloseTcClient() throws Exception {
+        TransactionCoordinatorClientImpl transactionCoordinatorClient = ((PulsarClientImpl) pulsarClient).getTcClient();
+        Field field = TransactionCoordinatorClientImpl.class.getDeclaredField("handlers");
+        field.setAccessible(true);
+        TransactionMetaStoreHandler[] handlers =
+                (TransactionMetaStoreHandler[]) field.get(transactionCoordinatorClient);
+
+        for (TransactionMetaStoreHandler handler : handlers) {
+            handler.newTransactionAsync(10, TimeUnit.SECONDS).get();
+        }
+        pulsarClient.close();
+        for (TransactionMetaStoreHandler handler : handlers) {
+            Method method = TransactionMetaStoreHandler.class.getMethod("getConnectHandleState");
+            method.setAccessible(true);
+            assertEquals(method.invoke(handler).toString(), "Closed");
+            try {
+                handler.newTransactionAsync(10, TimeUnit.SECONDS).get();
+            } catch (ExecutionException | InterruptedException e) {
+                assertTrue(e.getCause()
+                        instanceof TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException);
+            }
+        }
+    }
+
     public void start() throws Exception {
         // wait transaction coordinator init success
         Awaitility.await().until(() -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
index 622421b..1dba73a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java
@@ -246,7 +246,7 @@ public abstract class TransactionTestBase extends TestRetrySupport {
                 admin = null;
             }
             if (pulsarClient != null) {
-                pulsarClient.shutdown();
+                pulsarClient.close();
                 pulsarClient = null;
             }
             if (pulsarServiceList.size() > 0) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 8fb7ab4..9babd2a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -103,12 +103,16 @@ public class ConnectionHandler {
         long delayMs = backoff.next();
         log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try again in {} s", state.topic, state.getHandlerName(),
                 exception.getMessage(), delayMs / 1000.0);
-        state.setState(State.Connecting);
-        state.client.timer().newTimeout(timeout -> {
-            log.info("[{}] [{}] Reconnecting after connection was closed", state.topic, state.getHandlerName());
-            incrementEpoch();
-            grabCnx();
-        }, delayMs, TimeUnit.MILLISECONDS);
+        if (state.changeToConnecting()) {
+            state.client.timer().newTimeout(timeout -> {
+                log.info("[{}] [{}] Reconnecting after connection was closed", state.topic, state.getHandlerName());
+                incrementEpoch();
+                grabCnx();
+            }, delayMs, TimeUnit.MILLISECONDS);
+        } else {
+            log.info("[{}] [{}] Ignoring reconnection request (state: {})",
+                    state.topic, state.getHandlerName(), state.getState());
+        }
     }
 
     protected long incrementEpoch() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
index e72c97f..582df8c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
@@ -64,6 +64,13 @@ abstract class HandlerState {
         return STATE_UPDATER.get(this);
     }
 
+    protected boolean changeToConnecting() {
+        return (STATE_UPDATER.compareAndSet(this, State.Uninitialized, State.Connecting)
+                || STATE_UPDATER.compareAndSet(this, State.Ready, State.Connecting)
+                || STATE_UPDATER.compareAndSet(this, State.RegisteringSchema, State.Connecting)
+                || STATE_UPDATER.compareAndSet(this, State.Connecting, State.Connecting));
+    }
+
     protected void setState(State s) {
         STATE_UPDATER.set(this, s);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 1234b8b..7703afc 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -712,6 +712,14 @@ public class PulsarClientImpl implements PulsarClient {
                     throwable = t;
                 }
             }
+            if (tcClient != null) {
+                try {
+                    tcClient.close();
+                } catch (Throwable t) {
+                    log.warn("Failed to close tcClient");
+                    throwable = t;
+                }
+            }
             try {
                 // Shutting down eventLoopGroup separately because in some cases, cnxPool might be using different
                 // eventLoopGroup.
@@ -747,14 +755,6 @@ public class PulsarClientImpl implements PulsarClient {
                     throwable = t;
                 }
             }
-            if (tcClient != null) {
-                try {
-                    tcClient.close();
-                } catch (Throwable t) {
-                    log.warn("Failed to close tcClient");
-                    throwable = t;
-                }
-            }
             if (throwable != null) {
                 throw throwable;
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index f96cf57..ba6ee50 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.ReferenceCountUtil;
@@ -540,6 +541,12 @@ public class TransactionMetaStoreHandler extends HandlerState implements Connect
     @Override
     public void close() throws IOException {
         this.requestTimeout.cancel();
+        this.setState(State.Closed);
+    }
+
+    @VisibleForTesting
+    public State getConnectHandleState() {
+        return getState();
     }
 
     @Override

[pulsar] 03/03: [Java Client] Remove invalid call to Thread.currentThread().interrupt(); (#12652)

Posted by eo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f849d467689ed6e3e30438722e3ebc6779d059ae
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Sun Nov 7 01:50:13 2021 +0200

    [Java Client] Remove invalid call to Thread.currentThread().interrupt(); (#12652)
    
    - Thread.currentThread().interrupt() shouldn't be called here.
      - it must only be called when handling an InterruptedException.
      - this looks like a copy-paste bug introduced in
        https://github.com/apache/pulsar/pull/731/files#diff-d6fcf8aa2d0035cc386dca0942a452343d6854763c7fd397efa4e660c0069767R1183
    
    (cherry picked from commit b5f78f8ae5b4f2419e97044541722062fe388d9d)
---
 .../src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java        | 1 -
 1 file changed, 1 deletion(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 0f27a63..34e4b6c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1775,7 +1775,6 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                     processOpSendMsg(opSendMsg);
                 }
             } catch (PulsarClientException e) {
-                Thread.currentThread().interrupt();
                 semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));
             } catch (Throwable t) {
                 semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch()));

[pulsar] 01/03: Pulsar Client: restore SchemaInfo.builder() API (#12673)

Posted by eo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 4939204baad8241c3fb1a92d35d3ac7650754e04
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Tue Nov 9 14:31:33 2021 +0100

    Pulsar Client: restore SchemaInfo.builder() API (#12673)
    
    (cherry picked from commit 849e4dc5fa59b774287436a054efb17d198054b4)
---
 .../admin/AdminApiSchemaValidationEnforced.java    |  8 ++--
 .../schema/JsonSchemaCompatibilityCheckTest.java   |  3 +-
 .../SchemaCompatibilityCheckTest.java              |  5 +--
 .../pulsar/client/admin/internal/SchemasImpl.java  |  3 +-
 .../PulsarClientImplementationBinding.java         |  2 +
 .../apache/pulsar/common/schema/SchemaInfo.java    | 48 ++++++++++++++++++++++
 .../PulsarClientImplementationBindingImpl.java     | 34 +++------------
 .../pulsar/client/impl/schema/SchemaInfoTest.java  |  6 +--
 .../pulsar/common/protocol/schema/SchemaData.java  |  3 +-
 .../connect/schema/KafkaSchemaWrappedSchema.java   |  3 +-
 .../apache/pulsar/io/kafka/AvroSchemaCache.java    |  3 +-
 .../org/apache/pulsar/sql/presto/PulsarSplit.java  |  3 +-
 .../pulsar/sql/presto/TestPulsarMetadata.java      |  4 +-
 .../decoder/primitive/TestPrimitiveDecoder.java    | 24 +++++------
 14 files changed, 82 insertions(+), 67 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
index 3daf920..b7747de 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSchemaValidationEnforced.java
@@ -30,9 +30,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.protocol.schema.PostSchemaPayload;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -98,7 +96,7 @@ public class AdminApiSchemaValidationEnforced extends MockedPulsarServiceBaseTes
             assertTrue(e.getMessage().contains("HTTP 404 Not Found"));
         }
         Map<String, String> properties = Maps.newHashMap();
-        SchemaInfo schemaInfo = SchemaInfoImpl.builder()
+        SchemaInfo schemaInfo = SchemaInfo.builder()
                 .type(SchemaType.STRING)
                 .properties(properties)
                 .name("test")
@@ -147,7 +145,7 @@ public class AdminApiSchemaValidationEnforced extends MockedPulsarServiceBaseTes
         }
         Map<String, String> properties = Maps.newHashMap();
         properties.put("key1", "value1");
-        SchemaInfo schemaInfo = SchemaInfoImpl.builder()
+        SchemaInfo schemaInfo = SchemaInfo.builder()
                 .type(SchemaType.STRING)
                 .properties(properties)
                 .name("test")
@@ -177,7 +175,7 @@ public class AdminApiSchemaValidationEnforced extends MockedPulsarServiceBaseTes
         }
         admin.namespaces().setSchemaValidationEnforced(namespace,true);
         Map<String, String> properties = Maps.newHashMap();
-        SchemaInfo schemaInfo = SchemaInfoImpl.builder()
+        SchemaInfo schemaInfo = SchemaInfo.builder()
                 .type(SchemaType.STRING)
                 .properties(properties)
                 .name("test")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
index 32a9f9e..9bf0189 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java
@@ -33,7 +33,6 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
-import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -119,7 +118,7 @@ public class JsonSchemaCompatibilityCheckTest extends BaseAvroSchemaCompatibilit
             JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(mapper);
             JsonSchema schema = schemaGen.generateSchema(pojo);
 
-            SchemaInfo info = SchemaInfoImpl.builder()
+            SchemaInfo info = SchemaInfo.builder()
                     .name("")
                     .properties(properties)
                     .type(SchemaType.JSON)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
index 8def5dc..02913c6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java
@@ -35,15 +35,12 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
-import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.schema.Schemas;
@@ -324,7 +321,7 @@ public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
                 SchemaCompatibilityStrategy.FULL);
         byte[] changeSchemaBytes = (new String(Schema.AVRO(Schemas.PersonOne.class)
                 .getSchemaInfo().getSchema(), UTF_8) + "/n   /n   /n").getBytes();
-        SchemaInfo schemaInfo = SchemaInfoImpl.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
+        SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
         admin.schemas().createSchema(fqtn, schemaInfo);
 
         admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
index a072acd..54e731e 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SchemasImpl.java
@@ -32,7 +32,6 @@ import javax.ws.rs.client.WebTarget;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Schemas;
 import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.schema.DeleteSchemaResponse;
@@ -449,7 +448,7 @@ public class SchemasImpl extends BaseResource implements Schemas {
             schema = response.getData().getBytes(UTF_8);
         }
 
-        return SchemaInfoImpl.builder()
+        return SchemaInfo.builder()
                 .schema(schema)
                 .type(response.getType())
                 .properties(response.getProperties())
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
index f7bcf05..c8c300c 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
@@ -251,4 +251,6 @@ public interface PulsarClientImplementationBinding {
         byteBuffer.get(array);
         return array;
     }
+
+    SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, Map<String, String> propertiesValue);
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
index 01ba746..e05b0d2 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pulsar.common.schema;
 
+import java.util.Collections;
 import java.util.Map;
 
+import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.classification.InterfaceStability;
 
@@ -48,4 +50,50 @@ public interface SchemaInfo {
     Map<String, String> getProperties();
 
     String getSchemaDefinition();
+
+    static SchemaInfoBuilder builder() {
+        return new SchemaInfoBuilder();
+    }
+
+    class SchemaInfoBuilder {
+        private String name;
+        private byte[] schema;
+        private SchemaType type;
+        private Map<String, String> properties;
+        private boolean propertiesSet;
+
+        SchemaInfoBuilder() {
+        }
+
+        public SchemaInfoBuilder name(String name) {
+            this.name = name;
+            return this;
+        }
+
+        public SchemaInfoBuilder schema(byte[] schema) {
+            this.schema = schema;
+            return this;
+        }
+
+        public SchemaInfoBuilder type(SchemaType type) {
+            this.type = type;
+            return this;
+        }
+
+        public SchemaInfoBuilder properties(Map<String, String> properties) {
+            this.properties = properties;
+            this.propertiesSet = true;
+            return this;
+        }
+
+        public SchemaInfo build() {
+            Map<String, String> propertiesValue = this.properties;
+            if (!this.propertiesSet) {
+                propertiesValue = Collections.emptyMap();
+            }
+            return DefaultImplementation
+                    .getDefaultImplementation()
+                    .newSchemaInfoImpl(name, schema, type, propertiesValue);
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
index c146f23..c7f3af9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
@@ -45,35 +45,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.api.schema.SchemaDefinitionBuilder;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
-import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
-import org.apache.pulsar.client.impl.schema.AvroSchema;
-import org.apache.pulsar.client.impl.schema.BooleanSchema;
-import org.apache.pulsar.client.impl.schema.ByteBufferSchema;
-import org.apache.pulsar.client.impl.schema.ByteSchema;
-import org.apache.pulsar.client.impl.schema.BytesSchema;
-import org.apache.pulsar.client.impl.schema.DateSchema;
-import org.apache.pulsar.client.impl.schema.DoubleSchema;
-import org.apache.pulsar.client.impl.schema.FloatSchema;
-import org.apache.pulsar.client.impl.schema.InstantSchema;
-import org.apache.pulsar.client.impl.schema.IntSchema;
-import org.apache.pulsar.client.impl.schema.JSONSchema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
-import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
-import org.apache.pulsar.client.impl.schema.LocalDateSchema;
-import org.apache.pulsar.client.impl.schema.LocalDateTimeSchema;
-import org.apache.pulsar.client.impl.schema.LocalTimeSchema;
-import org.apache.pulsar.client.impl.schema.LongSchema;
-import org.apache.pulsar.client.impl.schema.NativeAvroBytesSchema;
-import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
-import org.apache.pulsar.client.impl.schema.ProtobufSchema;
-import org.apache.pulsar.client.impl.schema.RecordSchemaBuilderImpl;
-import org.apache.pulsar.client.impl.schema.SchemaDefinitionBuilderImpl;
-import org.apache.pulsar.client.impl.schema.SchemaUtils;
-import org.apache.pulsar.client.impl.schema.ShortSchema;
-import org.apache.pulsar.client.impl.schema.StringSchema;
-import org.apache.pulsar.client.impl.schema.TimeSchema;
-import org.apache.pulsar.client.impl.schema.TimestampSchema;
+import org.apache.pulsar.client.impl.schema.*;
 import org.apache.pulsar.client.impl.schema.generic.GenericProtobufNativeSchema;
 import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
 import org.apache.pulsar.client.internal.PulsarClientImplementationBinding;
@@ -383,4 +355,8 @@ public final class PulsarClientImplementationBindingImpl implements PulsarClient
     public MessagePayloadFactory newDefaultMessagePayloadFactory() {
         return new MessagePayloadFactoryImpl();
     }
+
+    public SchemaInfo newSchemaInfoImpl(String name, byte[] schema, SchemaType type, Map<String, String> propertiesValue) {
+        return new SchemaInfoImpl(name, schema, type, propertiesValue);
+    }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
index f96e84e..719704c 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SchemaInfoTest.java
@@ -289,7 +289,7 @@ public class SchemaInfoTest {
 
         @Test
         public void testUnsetProperties() {
-            final SchemaInfo schemaInfo = SchemaInfoImpl.builder()
+            final SchemaInfo schemaInfo = SchemaInfo.builder()
                     .type(SchemaType.STRING)
                     .schema(new byte[0])
                     .name("string")
@@ -305,7 +305,7 @@ public class SchemaInfoTest {
         public void testSetProperties() {
             final Map<String, String> map = Maps.newHashMap();
             map.put("test", "value");
-            final SchemaInfo schemaInfo = SchemaInfoImpl.builder()
+            final SchemaInfo schemaInfo = SchemaInfo.builder()
                     .type(SchemaType.STRING)
                     .schema(new byte[0])
                     .name("string")
@@ -323,7 +323,7 @@ public class SchemaInfoTest {
             final Map<String, String> map = new HashMap<>();
             map.put("key", null);
 
-            SchemaInfo si = SchemaInfoImpl.builder()
+            SchemaInfo si = SchemaInfo.builder()
                     .name("INT32")
                     .schema(new byte[0])
                     .type(SchemaType.INT32)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
index d5b4405..5c00f06 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/schema/SchemaData.java
@@ -22,7 +22,6 @@ import java.util.HashMap;
 import java.util.Map;
 import lombok.Builder;
 import lombok.Data;
-import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -46,7 +45,7 @@ public class SchemaData {
      * @return the converted schema info.
      */
     public SchemaInfo toSchemaInfo() {
-        return SchemaInfoImpl.builder()
+        return SchemaInfo.builder()
             .name("")
             .type(type)
             .schema(data)
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java
index ba57692..2db9d6c 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java
@@ -27,7 +27,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -45,7 +44,7 @@ public class KafkaSchemaWrappedSchema implements Schema<byte[]>, Serializable {
         Map<String, String> props = new HashMap<>();
         boolean isJsonConverter = converter instanceof JsonConverter;
         props.put(GenericAvroSchema.OFFSET_PROP, isJsonConverter ? "0" : "5");
-        this.schemaInfo = SchemaInfoImpl.builder()
+        this.schemaInfo = SchemaInfo.builder()
                 .name(isJsonConverter? "KafKaJson" : "KafkaAvro")
                 .type(isJsonConverter ? SchemaType.JSON : SchemaType.AVRO)
                 .schema(schema.toString().getBytes(UTF_8))
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
index 2a9e1c4..eda8c96 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/AvroSchemaCache.java
@@ -25,7 +25,6 @@ import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
 import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 
@@ -66,7 +65,7 @@ final class AvroSchemaCache {
             org.apache.avro.Schema schema = schemaRegistryClient.getById(schemaId);
             String definition = schema.toString(false);
             log.info("Schema {} definition {}", schemaId, definition);
-            SchemaInfo schemaInfo = SchemaInfoImpl.builder()
+            SchemaInfo schemaInfo = SchemaInfo.builder()
                     .type(SchemaType.AVRO)
                     .name(schema.getName())
                     .properties(Collections.emptyMap())
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
index 645edbd..03a6b77 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java
@@ -32,7 +32,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
 import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -102,7 +101,7 @@ public class PulsarSplit implements ConnectorSplit {
         this.offloadPolicies = offloadPolicies;
 
         ObjectMapper objectMapper = new ObjectMapper();
-        this.schemaInfo = SchemaInfoImpl.builder()
+        this.schemaInfo = SchemaInfo.builder()
                 .name(originSchemaName)
                 .type(schemaType)
                 .schema(schema.getBytes("ISO8859-1"))
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
index 79fb789..26199ba 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarMetadata.java
@@ -189,7 +189,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
     @Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
     public void testGetTableMetadataTableBlankSchema(String delimiter) throws PulsarAdminException {
         updateRewriteNamespaceDelimiterIfNeeded(delimiter);
-        SchemaInfo badSchemaInfo = SchemaInfoImpl.builder()
+        SchemaInfo badSchemaInfo = SchemaInfo.builder()
                 .schema(new byte[0])
                 .type(SchemaType.AVRO)
                 .build();
@@ -216,7 +216,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
     @Test(dataProvider = "rewriteNamespaceDelimiter", singleThreaded = true)
     public void testGetTableMetadataTableInvalidSchema(String delimiter) throws PulsarAdminException {
         updateRewriteNamespaceDelimiterIfNeeded(delimiter);
-        SchemaInfo badSchemaInfo = SchemaInfoImpl.builder()
+        SchemaInfo badSchemaInfo = SchemaInfo.builder()
                 .schema("foo".getBytes())
                 .type(SchemaType.AVRO)
                 .build();
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java
index c1b97d3..d01210b 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/primitive/TestPrimitiveDecoder.java
@@ -66,7 +66,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
     public void testPrimitiveType() {
 
         byte int8Value = 1;
-        SchemaInfo schemaInfoInt8 = SchemaInfoImpl.builder().type(SchemaType.INT8).build();
+        SchemaInfo schemaInfoInt8 = SchemaInfo.builder().type(SchemaType.INT8).build();
         Schema schemaInt8 = Schema.getSchema(schemaInfoInt8);
         List<PulsarColumnHandle> pulsarColumnHandleInt8 = getColumnColumnHandles(topicName, schemaInfoInt8, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
         PulsarRowDecoder pulsarRowDecoderInt8 = decoderFactory.createRowDecoder(topicName, schemaInfoInt8,
@@ -78,7 +78,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PRIMITIVE_COLUMN_NAME, TINYINT, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int8Value);
 
         short int16Value = 2;
-        SchemaInfo schemaInfoInt16 = SchemaInfoImpl.builder().type(SchemaType.INT16).build();
+        SchemaInfo schemaInfoInt16 = SchemaInfo.builder().type(SchemaType.INT16).build();
         Schema schemaInt16 = Schema.getSchema(schemaInfoInt16);
         List<PulsarColumnHandle> pulsarColumnHandleInt16 = getColumnColumnHandles(topicName, schemaInfoInt16, PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
         PulsarRowDecoder pulsarRowDecoderInt16 = decoderFactory.createRowDecoder(topicName, schemaInfoInt16,
@@ -90,7 +90,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PRIMITIVE_COLUMN_NAME, SMALLINT, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int16Value);
 
         int int32Value = 2;
-        SchemaInfo schemaInfoInt32 = SchemaInfoImpl.builder().type(SchemaType.INT32).build();
+        SchemaInfo schemaInfoInt32 = SchemaInfo.builder().type(SchemaType.INT32).build();
         Schema schemaInt32 = Schema.getSchema(schemaInfoInt32);
         List<PulsarColumnHandle> pulsarColumnHandleInt32 = getColumnColumnHandles(topicName, schemaInfoInt32,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -103,7 +103,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PRIMITIVE_COLUMN_NAME, INTEGER, false, false, PRIMITIVE_COLUMN_NAME, null, null, PulsarColumnHandle.HandleKeyValueType.NONE), int32Value);
 
         long int64Value = 2;
-        SchemaInfo schemaInfoInt64 = SchemaInfoImpl.builder().type(SchemaType.INT64).build();
+        SchemaInfo schemaInfoInt64 = SchemaInfo.builder().type(SchemaType.INT64).build();
         Schema schemaInt64 = Schema.getSchema(schemaInfoInt64);
         List<PulsarColumnHandle> pulsarColumnHandleInt64 = getColumnColumnHandles(topicName, schemaInfoInt64,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -117,7 +117,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PulsarColumnHandle.HandleKeyValueType.NONE), int64Value);
 
         String stringValue = "test";
-        SchemaInfo schemaInfoString = SchemaInfoImpl.builder().type(SchemaType.STRING).build();
+        SchemaInfo schemaInfoString = SchemaInfo.builder().type(SchemaType.STRING).build();
         Schema schemaString = Schema.getSchema(schemaInfoString);
         List<PulsarColumnHandle> pulsarColumnHandleString = getColumnColumnHandles(topicName, schemaInfoString,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -131,7 +131,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PulsarColumnHandle.HandleKeyValueType.NONE), stringValue);
 
         float floatValue = 0.2f;
-        SchemaInfo schemaInfoFloat = SchemaInfoImpl.builder().type(SchemaType.FLOAT).build();
+        SchemaInfo schemaInfoFloat = SchemaInfo.builder().type(SchemaType.FLOAT).build();
         Schema schemaFloat = Schema.getSchema(schemaInfoFloat);
         List<PulsarColumnHandle> pulsarColumnHandleFloat = getColumnColumnHandles(topicName, schemaInfoFloat,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -145,7 +145,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PulsarColumnHandle.HandleKeyValueType.NONE), Long.valueOf(Float.floatToIntBits(floatValue)));
 
         double doubleValue = 0.22d;
-        SchemaInfo schemaInfoDouble = SchemaInfoImpl.builder().type(SchemaType.DOUBLE).build();
+        SchemaInfo schemaInfoDouble = SchemaInfo.builder().type(SchemaType.DOUBLE).build();
         Schema schemaDouble = Schema.getSchema(schemaInfoDouble);
         List<PulsarColumnHandle> pulsarColumnHandleDouble = getColumnColumnHandles(topicName, schemaInfoDouble,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -159,7 +159,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PulsarColumnHandle.HandleKeyValueType.NONE), doubleValue);
 
         boolean booleanValue = true;
-        SchemaInfo schemaInfoBoolean = SchemaInfoImpl.builder().type(SchemaType.BOOLEAN).build();
+        SchemaInfo schemaInfoBoolean = SchemaInfo.builder().type(SchemaType.BOOLEAN).build();
         Schema schemaBoolean = Schema.getSchema(schemaInfoBoolean);
         List<PulsarColumnHandle> pulsarColumnHandleBoolean = getColumnColumnHandles(topicName, schemaInfoBoolean,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -174,7 +174,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
 
         byte[] bytesValue = new byte[1];
         bytesValue[0] = 1;
-        SchemaInfo schemaInfoBytes = SchemaInfoImpl.builder().type(SchemaType.BYTES).build();
+        SchemaInfo schemaInfoBytes = SchemaInfo.builder().type(SchemaType.BYTES).build();
         Schema schemaBytes = Schema.getSchema(schemaInfoBytes);
         List<PulsarColumnHandle> pulsarColumnHandleBytes = getColumnColumnHandles(topicName, schemaInfoBytes,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -188,7 +188,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PulsarColumnHandle.HandleKeyValueType.NONE), Slices.wrappedBuffer(bytesValue));
 
         Date dateValue = new Date(System.currentTimeMillis());
-        SchemaInfo schemaInfoDate = SchemaInfoImpl.builder().type(SchemaType.DATE).build();
+        SchemaInfo schemaInfoDate = SchemaInfo.builder().type(SchemaType.DATE).build();
         Schema schemaDate = Schema.getSchema(schemaInfoDate);
         List<PulsarColumnHandle> pulsarColumnHandleDate = getColumnColumnHandles(topicName, schemaInfoDate,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -202,7 +202,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PulsarColumnHandle.HandleKeyValueType.NONE), dateValue.getTime());
 
         Time timeValue = new Time(System.currentTimeMillis());
-        SchemaInfo schemaInfoTime = SchemaInfoImpl.builder().type(SchemaType.TIME).build();
+        SchemaInfo schemaInfoTime = SchemaInfo.builder().type(SchemaType.TIME).build();
         Schema schemaTime = Schema.getSchema(schemaInfoTime);
         List<PulsarColumnHandle> pulsarColumnHandleTime = getColumnColumnHandles(topicName, schemaInfoTime,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);
@@ -216,7 +216,7 @@ public class TestPrimitiveDecoder extends AbstractDecoderTester {
                 PulsarColumnHandle.HandleKeyValueType.NONE), timeValue.getTime());
 
         Timestamp timestampValue = new Timestamp(System.currentTimeMillis());
-        SchemaInfo schemaInfoTimestamp = SchemaInfoImpl.builder().type(SchemaType.TIMESTAMP).build();
+        SchemaInfo schemaInfoTimestamp = SchemaInfo.builder().type(SchemaType.TIMESTAMP).build();
         Schema schemaTimestamp = Schema.getSchema(schemaInfoTimestamp);
         List<PulsarColumnHandle> pulsarColumnHandleTimestamp = getColumnColumnHandles(topicName, schemaInfoTimestamp,
                 PulsarColumnHandle.HandleKeyValueType.NONE, false, decoderFactory);