You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/06/11 00:23:07 UTC

[pulsar] branch branch-2.8 updated (63126b9 -> 3505b98)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a change to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git.


    from 63126b9  Revert "[Issue 8751] Update Dockerfile for Pulsar and Dashboard to Cr… (#10861)
     new 263df02  pulsar-perf: print total number of messages (#10765)
     new d280fad  [Issue 10860][pulsar-metadata] Ensure metadata cache consistency across brokers (#10862)
     new e44be07  [Broker] Always let system topic TRANSACTION_BUFFER_SNAPSHOT be auto created (#10876)
     new 92aff2b  #10882 use ObjectMapper to parse Sink/Source configs (#10883)
     new a750f97  Introduce metrics servlet timeout setting (#10886)
     new 1ecadd4  Make KeyValueSchema an interface visible in the public Schema API (#10888)
     new 8252f9d  Provide a flag to disable managed ledger metrics (#10885)
     new 3505b98  Kafka connect sink adaptor to support non-primitive schemas (#10410)

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 conf/broker.conf                                   |   9 +
 conf/standalone.conf                               |   5 +
 pom.xml                                            |   1 -
 .../apache/pulsar/broker/ServiceConfiguration.java |  13 ++
 .../pulsar/broker/service/BrokerService.java       |   5 +-
 .../service/persistent/PersistentSubscription.java |   2 +-
 .../broker/service/persistent/PersistentTopic.java |   2 +-
 .../prometheus/PrometheusMetricsGenerator.java     |   8 +-
 .../stats/prometheus/PrometheusMetricsServlet.java |   3 +
 .../BrokerServiceAutoTopicCreationTest.java        |  27 +++
 .../apache/pulsar/broker/service/KeyValueTest.java |  16 +-
 .../pulsar/broker/service/NullValueTest.java       |  10 +-
 .../KeyValueSchemaCompatibilityCheckTest.java      | 118 ++++++------
 .../apache/pulsar/client/api/SimpleSchemaTest.java |  16 +-
 .../java/org/apache/pulsar/schema/SchemaTest.java  |  12 +-
 .../pulsar/client/api/schema/KeyValueSchema.java   |  31 ++-
 .../client/internal/DefaultImplementation.java     |   8 +-
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |  21 ++-
 .../org/apache/pulsar/admin/cli/CmdSources.java    |  26 ++-
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java  |  16 +-
 .../apache/pulsar/admin/cli/TestCmdSources.java    |  18 +-
 .../org/apache/pulsar/client/impl/MessageImpl.java |  12 +-
 .../client/impl/TypedMessageBuilderImpl.java       |  10 +-
 .../client/impl/schema/AutoConsumeSchema.java      |   2 +-
 .../client/impl/schema/AutoProduceBytesSchema.java |   6 +-
 ...KeyValueSchema.java => KeyValueSchemaImpl.java} |  63 +++++--
 .../client/impl/schema/KeyValueSchemaTest.java     |  48 ++---
 .../SupportVersioningKeyValueSchemaTest.java       |  12 +-
 .../impl/schema/generic/GenericSchemaImplTest.java |   6 +-
 .../impl/schema/generic/GenericSchemaTest.java     |   6 +-
 .../pulsar/common/events/EventsTopicNames.java     |  27 ++-
 .../pulsar/functions/instance/SinkRecord.java      |   4 +-
 .../apache/pulsar/functions/sink/PulsarSink.java   |   2 +-
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  |  71 +------
 .../connect/schema/PulsarSchemaToKafkaSchema.java  | 105 +++++++++++
 .../io/kafka/connect/KafkaConnectSinkTest.java     |  33 +++-
 .../connect/PulsarSchemaToKafkaSchemaTest.java     | 208 +++++++++++++++++++++
 .../metadata/cache/impl/MetadataCacheImpl.java     |  21 ++-
 .../apache/pulsar/metadata/MetadataCacheTest.java  |  71 +++++++
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  |   8 +-
 .../pulsar/testclient/PerformanceConsumer.java     |   6 +-
 .../pulsar/testclient/PerformanceProducer.java     |   6 +-
 .../integration/io/TestGenericObjectSink.java      |   2 +-
 .../tests/integration/io/PulsarIOTestRunner.java   |   7 +-
 .../tests/integration/presto/TestBasicPresto.java  |   8 +-
 45 files changed, 802 insertions(+), 309 deletions(-)
 copy pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/KVRecord.java => pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/KeyValueSchema.java (60%)
 rename pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/{KeyValueSchema.java => KeyValueSchemaImpl.java} (85%)
 create mode 100644 pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
 create mode 100644 pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java

[pulsar] 07/08: Provide a flag to disable managed ledger metrics (#10885)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8252f9db7f2f0382e19e4e715352ccb48a5c638a
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu Jun 10 21:58:35 2021 +0800

    Provide a flag to disable managed ledger metrics (#10885)
    
    
    (cherry picked from commit 3de089c3b54b7c35c5f18bcf5437c73a20ff4594)
---
 conf/broker.conf                                                  | 4 ++++
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java  | 5 +++++
 .../broker/stats/prometheus/PrometheusMetricsGenerator.java       | 8 +++++---
 3 files changed, 14 insertions(+), 3 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index dc9d732..e8c7877 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1137,8 +1137,12 @@ exposeConsumerLevelMetricsInPrometheus=false
 # Enable producer level metrics. default is false
 exposeProducerLevelMetricsInPrometheus=false
 
+# Enable managed ledger metrics (aggregated by namespace). default is false
+exposeManagedLedgerMetricsInPrometheus=true
+
 # Enable cursor level metrics. default is false
 exposeManagedCursorMetricsInPrometheus=false
+
 # Classname of Pluggable JVM GC metrics logger that can log GC specific metrics
 # jvmGCMetricsLoggerClassName=
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 2299e42..8bd05c2 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1945,6 +1945,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private boolean exposeProducerLevelMetricsInPrometheus = false;
     @FieldContext(
             category = CATEGORY_METRICS,
+            doc = "If true, export managed ledger metrics (aggregated by namespace)"
+    )
+    private boolean exposeManagedLedgerMetricsInPrometheus = true;
+    @FieldContext(
+            category = CATEGORY_METRICS,
             doc = "If true, export managed cursor metrics"
     )
     private boolean exposeManagedCursorMetricsInPrometheus = false;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
index 08af52c..8f303a3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java
@@ -131,14 +131,16 @@ public class PrometheusMetricsGenerator {
         parseMetricsToPrometheusMetrics(new ManagedLedgerCacheMetrics(pulsar).generate(),
                 clusterName, Collector.Type.GAUGE, stream);
 
-        // generate managedLedger metrics
-        parseMetricsToPrometheusMetrics(new ManagedLedgerMetrics(pulsar).generate(),
+        if (pulsar.getConfiguration().isExposeManagedLedgerMetricsInPrometheus()) {
+            // generate managedLedger metrics
+            parseMetricsToPrometheusMetrics(new ManagedLedgerMetrics(pulsar).generate(),
                 clusterName, Collector.Type.GAUGE, stream);
+        }
 
         if (pulsar.getConfiguration().isExposeManagedCursorMetricsInPrometheus()) {
             // generate managedCursor metrics
             parseMetricsToPrometheusMetrics(new ManagedCursorMetrics(pulsar).generate(),
-                    clusterName, Collector.Type.GAUGE, stream);
+                clusterName, Collector.Type.GAUGE, stream);
         }
 
         parseMetricsToPrometheusMetrics(Collections.singletonList(pulsar.getBrokerService()

[pulsar] 03/08: [Broker] Always let system topic TRANSACTION_BUFFER_SNAPSHOT be auto created (#10876)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e44be070d4bd6ee111a0ce03cb1b40fc3d71e842
Author: Michael Marshall <mi...@datastax.com>
AuthorDate: Wed Jun 9 22:57:30 2021 -0600

    [Broker] Always let system topic TRANSACTION_BUFFER_SNAPSHOT be auto created (#10876)
    
    * Always let system topic TRANSACTION_BUFFER_SNAPSHOT be auto created
    
    * Remove unused import
    
    * Add EVENTS_TOPIC_NAMES set; add unit tests
    
    (cherry picked from commit b1b3b3c65c6d889a5d81e8a877139ed2110eb58d)
---
 .../pulsar/broker/service/BrokerService.java       |  5 ++--
 .../service/persistent/PersistentSubscription.java |  2 +-
 .../broker/service/persistent/PersistentTopic.java |  2 +-
 .../BrokerServiceAutoTopicCreationTest.java        | 27 ++++++++++++++++++++++
 .../pulsar/common/events/EventsTopicNames.java     | 27 ++++++++++++++--------
 5 files changed, 48 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6089f11..575738a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -26,6 +26,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
 import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
+import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
@@ -128,7 +129,6 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.FieldContext;
-import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor;
 import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
 import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
@@ -2490,8 +2490,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
 
     public boolean isAllowAutoTopicCreation(final TopicName topicName) {
         //System topic can always be created automatically
-        if (EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME.equals(topicName.getLocalName())
-                && pulsar.getConfiguration().isSystemTopicEnabled()) {
+        if (pulsar.getConfiguration().isSystemTopicEnabled() && checkTopicIsEventsNames(topicName)) {
             return true;
         }
         AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 159f39a..16f2259 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -141,7 +141,7 @@ public class PersistentSubscription implements Subscription {
         this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
         this.setReplicated(replicated);
         if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
-                && !checkTopicIsEventsNames(topicName)
+                && !checkTopicIsEventsNames(TopicName.get(topicName))
                 && !topicName.startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName())
                 && !topicName.startsWith(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX)
                 && !topicName.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index c893f8f..65a526e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -316,7 +316,7 @@ public class PersistentTopic extends AbstractTopic
         checkReplicatedSubscriptionControllerState();
         TopicName topicName = TopicName.get(topic);
         if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
-                && !checkTopicIsEventsNames(topic)
+                && !checkTopicIsEventsNames(topicName)
                 && !topicName.getEncodedLocalName().startsWith(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName())
                 && !topicName.getEncodedLocalName().startsWith(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX)
                 && !topicName.getEncodedLocalName().endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index f7fffda..7b74933 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -25,6 +25,7 @@ import static org.testng.Assert.fail;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.TopicType;
@@ -366,4 +367,30 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
             assertTrue(e instanceof PulsarClientException);
         }
     }
+
+    @Test
+    public void testAutoCreationOfSystemTopicTransactionBufferSnapshot() throws Exception {
+        pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+        pulsar.getConfiguration().setSystemTopicEnabled(true);
+
+        final String topicString = "persistent://prop/ns-abc/" + EventsTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
+
+        pulsarClient.newProducer().topic(topicString).create();
+
+        assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+        assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+    }
+
+    @Test
+    public void testAutoCreationOfSystemTopicNamespaceEvents() throws Exception {
+        pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+        pulsar.getConfiguration().setSystemTopicEnabled(true);
+
+        final String topicString = "persistent://prop/ns-abc/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+
+        pulsarClient.newProducer().topic(topicString).create();
+
+        assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+        assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
index 380abae..cf37a30 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/events/EventsTopicNames.java
@@ -18,12 +18,18 @@
  */
 package org.apache.pulsar.common.events;
 
+import org.apache.pulsar.common.naming.TopicName;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
 /**
- * System topic name for the event type.
+ * System topic names for each {@link EventType}.
  */
 public class EventsTopicNames {
 
-
     /**
      * Local topic name for the namespace events.
      */
@@ -34,13 +40,14 @@ public class EventsTopicNames {
      */
     public static final String TRANSACTION_BUFFER_SNAPSHOT = "__transaction_buffer_snapshot";
 
-    public static boolean checkTopicIsEventsNames(String topicName) {
-        if (topicName.endsWith(NAMESPACE_EVENTS_LOCAL_NAME)) {
-            return true;
-        } else if (topicName.endsWith(TRANSACTION_BUFFER_SNAPSHOT)) {
-            return true;
-        } else {
-            return false;
-        }
+    /**
+     * The set of all local topic names declared above.
+     */
+    public static final Set<String> EVENTS_TOPIC_NAMES =
+        Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(NAMESPACE_EVENTS_LOCAL_NAME, TRANSACTION_BUFFER_SNAPSHOT)));
+
+    public static boolean checkTopicIsEventsNames(TopicName topicName) {
+        return EVENTS_TOPIC_NAMES.contains(topicName.getLocalName());
     }
 }

[pulsar] 05/08: Introduce metrics servlet timeout setting (#10886)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit a750f975799a8690535fa493a0b09a3a226c06db
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu Jun 10 15:02:35 2021 +0800

    Introduce metrics servlet timeout setting (#10886)
    
    *Motivation*
    
    Generating metrics is an expensive task and it can take more than 30 seconds
    if you have close to or more than a million topics.
    
    *Modification*
    
    This change provides a setting to adjust the async context timeout.
    
    (cherry picked from commit c75d45b3f9be7a210938f5e87c7b4301e4fd25ee)
---
 conf/broker.conf                                                  | 5 +++++
 conf/standalone.conf                                              | 5 +++++
 .../main/java/org/apache/pulsar/broker/ServiceConfiguration.java  | 8 ++++++++
 .../pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java  | 3 +++
 4 files changed, 21 insertions(+)

diff --git a/conf/broker.conf b/conf/broker.conf
index f0f0da3..dc9d732 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1142,6 +1142,11 @@ exposeManagedCursorMetricsInPrometheus=false
 # Classname of Pluggable JVM GC metrics logger that can log GC specific metrics
 # jvmGCMetricsLoggerClassName=
 
+# Time in milliseconds that metrics endpoint would time out. Default is 30s.
+# Increase it if there are a lot of topics to expose topic-level metrics.
+# Set it to 0 to disable timeout.
+metricsServletTimeoutMs=30000
+
 ### --- Functions --- ###
 
 # Enable Functions Worker Service in Broker
diff --git a/conf/standalone.conf b/conf/standalone.conf
index ebb110a..7c818d3 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -831,6 +831,11 @@ webSocketMaxTextFrameSize=1048576
 # Enable topic level metrics
 exposeTopicLevelMetricsInPrometheus=true
 
+# Time in milliseconds that metrics endpoint would time out. Default is 30s.
+# Increase it if there are a lot of topics to expose topic-level metrics.
+# Set it to 0 to disable timeout.
+metricsServletTimeoutMs=30000
+
 # Classname of Pluggable JVM GC metrics logger that can log GC specific metrics
 # jvmGCMetricsLoggerClassName=
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 21cdcef..2299e42 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1962,6 +1962,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private boolean exposePreciseBacklogInPrometheus = false;
 
     @FieldContext(
+        category = CATEGORY_METRICS,
+        doc = "Time in milliseconds that metrics endpoint would time out. Default is 30s.\n" +
+            " Increase it if there are a lot of topics to expose topic-level metrics.\n" +
+            " Set it to 0 to disable timeout."
+    )
+    private long metricsServletTimeoutMs = 30000;
+
+    @FieldContext(
             category = CATEGORY_METRICS,
             doc = "Enable expose the backlog size for each subscription when generating stats.\n" +
                     " Locking is used for fetching the status so default to false."
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
index 7e469a2..026c26f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
@@ -43,6 +43,7 @@ public class PrometheusMetricsServlet extends HttpServlet {
     private final boolean shouldExportTopicMetrics;
     private final boolean shouldExportConsumerMetrics;
     private final boolean shouldExportProducerMetrics;
+    private final long metricsServletTimeoutMs;
     private List<PrometheusRawMetricsProvider> metricsProviders;
 
     private ExecutorService executor = null;
@@ -53,6 +54,7 @@ public class PrometheusMetricsServlet extends HttpServlet {
         this.shouldExportTopicMetrics = includeTopicMetrics;
         this.shouldExportConsumerMetrics = includeConsumerMetrics;
         this.shouldExportProducerMetrics = shouldExportProducerMetrics;
+        this.metricsServletTimeoutMs = pulsar.getConfiguration().getMetricsServletTimeoutMs();
     }
 
     @Override
@@ -64,6 +66,7 @@ public class PrometheusMetricsServlet extends HttpServlet {
     protected void doGet(HttpServletRequest request, HttpServletResponse response)
             throws ServletException, IOException {
         AsyncContext context = request.startAsync();
+        context.setTimeout(metricsServletTimeoutMs);
         executor.execute(safeRun(() -> {
             HttpServletResponse res = (HttpServletResponse) context.getResponse();
             try {

[pulsar] 04/08: #10882 use ObjectMapper to parse Sink/Source configs (#10883)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 92aff2bbb835ffcca509e227a5af06ec7a20672b
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Thu Jun 10 14:56:44 2021 +0800

    #10882 use ObjectMapper to parse Sink/Source configs (#10883)
    
    Fixes #10882
    
    ### Motivation
    
    CmdSink and CmdSource uses `gson` to parse the JSON configs from pulsar-admin. But most of connectors are using ObjectMapper to serde the config into actual class. `gson` will also convert int/long value into float by default, which will lead ObjectMapper cannot parse float string into int/long correctlly.
    
    ### Modifications
    
    use ObjectMapper to parse sink/source config.
    
    (cherry picked from commit 2c9ea8113cbe0d2cc97e4e308f5ed0487fd13c1e)
---
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 21 ++++++++++++-----
 .../org/apache/pulsar/admin/cli/CmdSources.java    | 26 ++++++++++++++++------
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java  | 16 +++++++++++--
 .../apache/pulsar/admin/cli/TestCmdSources.java    | 18 +++++++++++++--
 4 files changed, 65 insertions(+), 16 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 16e1664..b44affb 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -27,6 +27,9 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.StringConverter;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
@@ -55,6 +58,7 @@ import org.apache.pulsar.common.functions.UpdateOptionsImpl;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 
 @Getter
 @Parameters(commandDescription = "Interface for managing Pulsar IO sinks (egress data from Pulsar)")
@@ -463,8 +467,12 @@ public class CmdSinks extends CmdBase {
                 sinkConfig.setResources(resources);
             }
 
-            if (null != sinkConfigString) {
-                sinkConfig.setConfigs(parseConfigs(sinkConfigString));
+            try {
+                if (null != sinkConfigString) {
+                    sinkConfig.setConfigs(parseConfigs(sinkConfigString));
+                }
+            } catch (Exception ex) {
+                throw new ParameterException("Cannot parse sink-config", ex);
             }
 
             if (autoAck != null) {
@@ -485,9 +493,12 @@ public class CmdSinks extends CmdBase {
             validateSinkConfigs(sinkConfig);
         }
 
-        protected Map<String, Object> parseConfigs(String str) {
-            Type type = new TypeToken<Map<String, Object>>(){}.getType();
-            return new Gson().fromJson(str, type);
+        protected Map<String, Object> parseConfigs(String str) throws JsonProcessingException {
+            ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
+            TypeReference<HashMap<String,Object>> typeRef
+                = new TypeReference<HashMap<String,Object>>() {};
+
+            return mapper.readValue(str, typeRef);
         }
 
         protected void validateSinkConfigs(SinkConfig sinkConfig) {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 3ab6ccc..337847e 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -27,6 +27,9 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.StringConverter;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
@@ -35,6 +38,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Type;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -58,6 +62,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 
 @Getter
 @Parameters(commandDescription = "Interface for managing Pulsar IO Sources (ingress data into Pulsar)")
@@ -417,8 +422,12 @@ public class CmdSources extends CmdBase {
                 sourceConfig.setResources(resources);
             }
 
-            if (null != sourceConfigString) {
-                sourceConfig.setConfigs(parseConfigs(sourceConfigString));
+            try {
+                if (null != sourceConfigString) {
+                    sourceConfig.setConfigs(parseConfigs(sourceConfigString));
+                }
+            } catch (Exception ex) {
+                throw new ParameterException("Cannot parse source-config", ex);
             }
             
             if (null != batchSourceConfigString) {
@@ -432,12 +441,15 @@ public class CmdSources extends CmdBase {
             validateSourceConfigs(sourceConfig);
         }
 
-        protected Map<String, Object> parseConfigs(String str) {
-            Type type = new TypeToken<Map<String, Object>>(){}.getType();
-            return new Gson().fromJson(str, type); 
+        protected Map<String, Object> parseConfigs(String str) throws JsonProcessingException {
+            ObjectMapper mapper = ObjectMapperFactory.getThreadLocal();
+            TypeReference<HashMap<String,Object>> typeRef
+                    = new TypeReference<HashMap<String,Object>>() {};
+
+            return mapper.readValue(str, typeRef);
         }
-        
-        protected BatchSourceConfig parseBatchSourceConfigs(String str) {
+
+            protected BatchSourceConfig parseBatchSourceConfigs(String str) {
         	return new Gson().fromJson(str, BatchSourceConfig.class);
         }
 
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 1f4819d..87f9a3f 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
 import com.beust.jcommander.ParameterException;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
 import java.io.Closeable;
 import java.io.File;
@@ -94,7 +95,7 @@ public class TestCmdSinks {
     private static final Double CPU = 100.0;
     private static final Long RAM = 1024L * 1024L;
     private static final Long DISK = 1024L * 1024L * 1024L;
-    private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 0000 2018\"}";
+    private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\",\"int\":1000,\"int_string\":\"1000\",\"float\":1000.0,\"float_string\":\"1000.0\"}";
 
     private PulsarAdmin pulsarAdmin;
     private Sinks sink;
@@ -144,7 +145,7 @@ public class TestCmdSinks {
         }
     }
 
-    public SinkConfig getSinkConfig() {
+    public SinkConfig getSinkConfig() throws JsonProcessingException {
         SinkConfig sinkConfig = new SinkConfig();
         sinkConfig.setTenant(TENANT);
         sinkConfig.setNamespace(NAMESPACE);
@@ -738,4 +739,15 @@ public class TestCmdSinks {
 
 
     }
+
+    @Test
+    public void testParseConfigs() throws Exception {
+        SinkConfig testSinkConfig = getSinkConfig();
+        Map<String, Object> config = testSinkConfig.getConfigs();
+        Assert.assertEquals(config.get("int"), 1000);
+        Assert.assertEquals(config.get("int_string"), "1000");
+        Assert.assertEquals(config.get("float"), 1000.0);
+        Assert.assertEquals(config.get("float_string"), "1000.0");
+        Assert.assertEquals(config.get("created_at"), "Mon Jul 02 00:33:15 +0000 2018");
+    }
 }
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 23791ae..13e0ff0 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -28,11 +28,13 @@ import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
 import com.beust.jcommander.ParameterException;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.Map;
 
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -75,7 +77,8 @@ public class TestCmdSources {
     private static final Double CPU = 100.0;
     private static final Long RAM = 1024L * 1024L;
     private static final Long DISK = 1024L * 1024L * 1024L;
-    private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\"}";
+    private static final String SINK_CONFIG_STRING =
+            "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\",\"int\":1000,\"int_string\":\"1000\",\"float\":1000.0,\"float_string\":\"1000.0\"}";
     private static final String BATCH_SOURCE_CONFIG_STRING = "{ \"discoveryTriggererClassName\" : \"org.apache.pulsar.io.batchdiscovery.CronTriggerer\","
 			+ "\"discoveryTriggererConfig\": {\"cron\": \"5 0 0 0 0 *\"} }";
 
@@ -122,7 +125,7 @@ public class TestCmdSources {
         }
     }
 
-    public SourceConfig getSourceConfig() {
+    public SourceConfig getSourceConfig() throws JsonProcessingException {
         SourceConfig sourceConfig = new SourceConfig();
         sourceConfig.setTenant(TENANT);
         sourceConfig.setNamespace(NAMESPACE);
@@ -690,4 +693,15 @@ public class TestCmdSources {
 
 
     }
+
+    @Test
+    public void testParseConfigs() throws Exception {
+        SourceConfig testSourceConfig = getSourceConfig();
+        Map<String, Object> config = testSourceConfig.getConfigs();
+        Assert.assertEquals(config.get("int"), 1000);
+        Assert.assertEquals(config.get("int_string"), "1000");
+        Assert.assertEquals(config.get("float"), 1000.0);
+        Assert.assertEquals(config.get("float_string"), "1000.0");
+        Assert.assertEquals(config.get("created_at"), "Mon Jul 02 00:33:15 +0000 2018");
+    }
 }
\ No newline at end of file

[pulsar] 08/08: Kafka connect sink adaptor to support non-primitive schemas (#10410)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3505b98b7ea0c115bb946c7a225a6b6b1e2ac42b
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Thu Jun 10 13:39:14 2021 -0700

    Kafka connect sink adaptor to support non-primitive schemas (#10410)
    
    (cherry picked from commit cf50da373500416dd2e9243088ec0b58496fa7bd)
---
 pom.xml                                            |   1 -
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  |  68 +------
 .../connect/schema/PulsarSchemaToKafkaSchema.java  | 105 +++++++++++
 .../io/kafka/connect/KafkaConnectSinkTest.java     |  33 +++-
 .../connect/PulsarSchemaToKafkaSchemaTest.java     | 208 +++++++++++++++++++++
 5 files changed, 341 insertions(+), 74 deletions(-)

diff --git a/pom.xml b/pom.xml
index de59f44..d24e404 100644
--- a/pom.xml
+++ b/pom.xml
@@ -210,7 +210,6 @@ flexible messaging model and an intuitive client API.</description>
     <javassist.version>3.25.0-GA</javassist.version>
     <failsafe.version>2.3.1</failsafe.version>
     <skyscreamer.version>1.5.0</skyscreamer.version>
-    <confluent.version>5.2.2</confluent.version>
     <objenesis.version>3.1</objenesis.version>
     <awaitility.version>4.0.3</awaitility.version>
 
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index a75f1ee..cdd6c04b 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.io.kafka.connect;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -42,6 +41,7 @@ import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
 
 import java.util.List;
 import java.util.Map;
@@ -59,37 +59,8 @@ import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.PULSAR_
 
 @Slf4j
 public class KafkaConnectSink implements Sink<GenericObject> {
-
     private boolean unwrapKeyValueIfAvailable;
 
-    private final static ImmutableMap<Class<?>, Schema> primitiveTypeToSchema;
-    private final static ImmutableMap<SchemaType, Schema> pulsarSchemaTypeTypeToKafkaSchema;
-
-    static {
-        primitiveTypeToSchema = ImmutableMap.<Class<?>, Schema>builder()
-                .put(Boolean.class, Schema.BOOLEAN_SCHEMA)
-                .put(Byte.class, Schema.INT8_SCHEMA)
-                .put(Short.class, Schema.INT16_SCHEMA)
-                .put(Integer.class, Schema.INT32_SCHEMA)
-                .put(Long.class, Schema.INT64_SCHEMA)
-                .put(Float.class, Schema.FLOAT32_SCHEMA)
-                .put(Double.class, Schema.FLOAT64_SCHEMA)
-                .put(String.class, Schema.STRING_SCHEMA)
-                .put(byte[].class, Schema.BYTES_SCHEMA)
-                .build();
-        pulsarSchemaTypeTypeToKafkaSchema = ImmutableMap.<SchemaType, Schema>builder()
-                .put(SchemaType.BOOLEAN, Schema.BOOLEAN_SCHEMA)
-                .put(SchemaType.INT8, Schema.INT8_SCHEMA)
-                .put(SchemaType.INT16, Schema.INT16_SCHEMA)
-                .put(SchemaType.INT32, Schema.INT32_SCHEMA)
-                .put(SchemaType.INT64, Schema.INT64_SCHEMA)
-                .put(SchemaType.FLOAT, Schema.FLOAT32_SCHEMA)
-                .put(SchemaType.DOUBLE, Schema.FLOAT64_SCHEMA)
-                .put(SchemaType.STRING, Schema.STRING_SCHEMA)
-                .put(SchemaType.BYTES, Schema.BYTES_SCHEMA)
-                .build();
-    }
-
     private PulsarKafkaSinkContext sinkContext;
     @VisibleForTesting
     PulsarKafkaSinkTaskContext taskContext;
@@ -252,37 +223,6 @@ public class KafkaConnectSink implements Sink<GenericObject> {
         }
     }
 
-    /**
-     * org.apache.kafka.connect.data.Schema for the object
-     * @param obj - Object to get schema of.
-     * @return org.apache.kafka.connect.data.Schema
-     */
-    private static Schema getKafkaConnectSchemaForObject(Object obj) {
-        if (obj != null && primitiveTypeToSchema.containsKey(obj.getClass())) {
-            return primitiveTypeToSchema.get(obj.getClass());
-        }
-        return null;
-    }
-
-    public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema, Object obj) {
-        if (pulsarSchema != null && pulsarSchema.getSchemaInfo() != null
-                && pulsarSchemaTypeTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
-            return pulsarSchemaTypeTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
-        }
-
-        Schema result = getKafkaConnectSchemaForObject(obj);
-        if (result == null) {
-            throw new IllegalStateException("Unsupported kafka schema for Pulsar Schema "
-                    + (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null
-                        ? "null"
-                        : pulsarSchema.getSchemaInfo().toString())
-                    + " object class "
-                    + (obj == null ? "null" : obj.getClass().getCanonicalName()));
-        }
-        return result;
-    }
-
-
     @SuppressWarnings("rawtypes")
     private SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
         final int partition = sourceRecord.getPartitionIndex().orElse(0);
@@ -301,8 +241,8 @@ public class KafkaConnectSink implements Sink<GenericObject> {
             KeyValue kv = (KeyValue) sourceRecord.getValue().getNativeObject();
             key = kv.getKey();
             value = kv.getValue();
-            keySchema = getKafkaConnectSchema(kvSchema.getKeySchema(), key);
-            valueSchema = getKafkaConnectSchema(kvSchema.getValueSchema(), value);
+            keySchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema());
+            valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema());
         } else {
             if (sourceRecord.getMessage().get().hasBase64EncodedKey()) {
                 key = sourceRecord.getMessage().get().getKeyBytes();
@@ -311,8 +251,8 @@ public class KafkaConnectSink implements Sink<GenericObject> {
                 key = sourceRecord.getKey().orElse(null);
                 keySchema = Schema.STRING_SCHEMA;
             }
+            valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(sourceRecord.getSchema());
             value = sourceRecord.getValue().getNativeObject();
-            valueSchema = getKafkaConnectSchema(sourceRecord.getSchema(), value);
         }
 
         long offset = sourceRecord.getRecordSequence()
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
new file mode 100644
index 0000000..c5bde39
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/PulsarSchemaToKafkaSchema.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka.connect.schema;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.ExecutionError;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroData;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+@Slf4j
+public class PulsarSchemaToKafkaSchema {
+    private final static ImmutableMap<SchemaType, Schema> pulsarSchemaTypeToKafkaSchema;
+    private static final AvroData avroData = new AvroData(1000);
+    private static final Cache<byte[], Schema> schemaCache =
+            CacheBuilder.newBuilder().maximumSize(10000)
+                    .expireAfterAccess(30, TimeUnit.MINUTES).build();
+
+    static {
+        pulsarSchemaTypeToKafkaSchema = ImmutableMap.<SchemaType, Schema>builder()
+                .put(SchemaType.BOOLEAN, Schema.BOOLEAN_SCHEMA)
+                .put(SchemaType.INT8, Schema.INT8_SCHEMA)
+                .put(SchemaType.INT16, Schema.INT16_SCHEMA)
+                .put(SchemaType.INT32, Schema.INT32_SCHEMA)
+                .put(SchemaType.INT64, Schema.INT64_SCHEMA)
+                .put(SchemaType.FLOAT, Schema.FLOAT32_SCHEMA)
+                .put(SchemaType.DOUBLE, Schema.FLOAT64_SCHEMA)
+                .put(SchemaType.STRING, Schema.STRING_SCHEMA)
+                .put(SchemaType.BYTES, Schema.BYTES_SCHEMA)
+                .put(SchemaType.DATE, Date.SCHEMA)
+                .build();
+    }
+
+    // Parse json to shaded schema
+    private static org.apache.pulsar.kafka.shade.avro.Schema parseAvroSchema(String schemaJson) {
+        final org.apache.pulsar.kafka.shade.avro.Schema.Parser parser = new org.apache.pulsar.kafka.shade.avro.Schema.Parser();
+        parser.setValidateDefaults(false);
+        return parser.parse(schemaJson);
+    }
+
+    public static Schema getKafkaConnectSchema(org.apache.pulsar.client.api.Schema pulsarSchema) {
+        if (pulsarSchema != null && pulsarSchema.getSchemaInfo() != null) {
+            if (pulsarSchemaTypeToKafkaSchema.containsKey(pulsarSchema.getSchemaInfo().getType())) {
+                return pulsarSchemaTypeToKafkaSchema.get(pulsarSchema.getSchemaInfo().getType());
+            }
+
+            try {
+                return schemaCache.get(pulsarSchema.getSchemaInfo().getSchema(), () -> {
+                    if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+                        KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
+                        return SchemaBuilder.map(getKafkaConnectSchema(kvSchema.getKeySchema()),
+                                                 getKafkaConnectSchema(kvSchema.getValueSchema()))
+                                    .build();
+                    }
+                    org.apache.pulsar.kafka.shade.avro.Schema avroSchema =
+                            parseAvroSchema(new String(pulsarSchema.getSchemaInfo().getSchema(), UTF_8));
+                    return avroData.toConnectSchema(avroSchema);
+                });
+            } catch (ExecutionException | UncheckedExecutionException | ExecutionError ee) {
+                throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Failed to convert to Kafka Schema.", ee);
+            }
+        }
+
+        throw logAndThrowOnUnsupportedSchema(pulsarSchema, "Schema is required.", null);
+    }
+
+    private static IllegalStateException logAndThrowOnUnsupportedSchema(org.apache.pulsar.client.api.Schema pulsarSchema,
+                                                       String prefix,
+                                                       Throwable cause) {
+        String msg = prefix + " Pulsar Schema: "
+                + (pulsarSchema == null || pulsarSchema.getSchemaInfo() == null
+                ? "null" : pulsarSchema.getSchemaInfo().toString());
+        log.error(msg);
+        return new IllegalStateException(msg, cause);
+    }
+}
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index c388836..6f4f954 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -30,8 +30,10 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.util.MessageIdUtils;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
@@ -47,6 +49,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -281,62 +284,74 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
 
     @Test
     public void bytesRecordSchemaTest() throws Exception {
-        recordSchemaTest("val".getBytes(StandardCharsets.US_ASCII), null, "val", "BYTES");
         recordSchemaTest("val".getBytes(StandardCharsets.US_ASCII), Schema.BYTES, "val", "BYTES");
     }
 
     @Test
     public void stringRecordSchemaTest() throws Exception {
-        recordSchemaTest("val", null, "val", "STRING");
         recordSchemaTest("val", Schema.STRING, "val", "STRING");
     }
 
     @Test
     public void booleanRecordSchemaTest() throws Exception {
-        recordSchemaTest(true, null, true, "BOOLEAN");
         recordSchemaTest(true, Schema.BOOL, true, "BOOLEAN");
     }
 
     @Test
     public void byteRecordSchemaTest() throws Exception {
         // int 1 is coming back from ObjectMapper
-        recordSchemaTest((byte)1, null, 1, "INT8");
         recordSchemaTest((byte)1, Schema.INT8, 1, "INT8");
     }
 
     @Test
     public void shortRecordSchemaTest() throws Exception {
         // int 1 is coming back from ObjectMapper
-        recordSchemaTest((short)1, null, 1, "INT16");
         recordSchemaTest((short)1, Schema.INT16, 1, "INT16");
     }
 
     @Test
     public void integerRecordSchemaTest() throws Exception {
-        recordSchemaTest(Integer.MAX_VALUE, null, Integer.MAX_VALUE, "INT32");
         recordSchemaTest(Integer.MAX_VALUE, Schema.INT32, Integer.MAX_VALUE, "INT32");
     }
 
     @Test
     public void longRecordSchemaTest() throws Exception {
-        recordSchemaTest(Long.MAX_VALUE, null, Long.MAX_VALUE, "INT64");
         recordSchemaTest(Long.MAX_VALUE, Schema.INT64, Long.MAX_VALUE, "INT64");
     }
 
     @Test
     public void floatRecordSchemaTest() throws Exception {
         // 1.0d is coming back from ObjectMapper
-        recordSchemaTest(1.0f, null, 1.0d, "FLOAT32");
         recordSchemaTest(1.0f, Schema.FLOAT, 1.0d, "FLOAT32");
     }
 
     @Test
     public void doubleRecordSchemaTest() throws Exception {
-        recordSchemaTest(Double.MAX_VALUE, null, Double.MAX_VALUE, "FLOAT64");
         recordSchemaTest(Double.MAX_VALUE, Schema.DOUBLE, Double.MAX_VALUE, "FLOAT64");
     }
 
     @Test
+    public void jsonSchemaTest() throws Exception {
+        JSONSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> jsonSchema = JSONSchema
+                .of(SchemaDefinition.<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations>builder()
+                        .withPojo(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class)
+                        .withAlwaysAllowNull(false)
+                        .build());
+        PulsarSchemaToKafkaSchemaTest.StructWithAnnotations obj = new PulsarSchemaToKafkaSchemaTest.StructWithAnnotations();
+        obj.setField1(10);
+        obj.setField2("test");
+        obj.setField3(100L);
+
+        Map<String, Object> expected = new LinkedHashMap<>();
+        expected.put("field1", 10);
+        expected.put("field2", "test");
+        // integer is coming back from ObjectMapper
+        expected.put("field3", 100);
+
+        recordSchemaTest(obj, jsonSchema, expected, "STRUCT");
+    }
+
+    @Test
     public void unknownRecordSchemaTest() throws Exception {
         Object obj = new Object();
         props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
new file mode 100644
index 0000000..9075dd9
--- /dev/null
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/PulsarSchemaToKafkaSchemaTest.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.kafka.connect;
+
+import com.google.common.collect.Lists;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.reflect.AvroDefault;
+import org.apache.avro.reflect.Nullable;
+import org.apache.kafka.connect.data.Date;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.JSONSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
+import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
+import org.testng.annotations.Test;
+
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Test the conversion of PulsarSchema To KafkaSchema\.
+ */
+@Slf4j
+public class PulsarSchemaToKafkaSchemaTest {
+
+    static final List<String> STRUCT_FIELDS = Lists.newArrayList("field1", "field2", "field3");
+
+    @Data
+    static class StructWithAnnotations {
+        int field1;
+        @Nullable
+        String field2;
+        @AvroDefault("\"1000\"")
+        Long field3;
+    }
+
+    @Test
+    public void bytesSchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTES);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.BYTES);
+
+        kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BYTEBUFFER);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.BYTES);
+    }
+
+    @Test
+    public void stringSchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.STRING);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRING);
+    }
+
+    @Test
+    public void booleanSchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.BOOL);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.BOOLEAN);
+    }
+
+    @Test
+    public void int8SchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT8);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT8);
+    }
+
+    @Test
+    public void int16SchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT16);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT16);
+    }
+
+    @Test
+    public void int32SchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT32);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT32);
+    }
+
+    @Test
+    public void int64SchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INT64);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.INT64);
+    }
+
+    @Test
+    public void float32SchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.FLOAT);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.FLOAT32);
+    }
+
+    @Test
+    public void float64SchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DOUBLE);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.FLOAT64);
+    }
+
+    @Test
+    public void kvBytesSchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.KV_BYTES());
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.MAP);
+        assertEquals(kafkaSchema.keySchema().type(), org.apache.kafka.connect.data.Schema.Type.BYTES);
+        assertEquals(kafkaSchema.valueSchema().type(), org.apache.kafka.connect.data.Schema.Type.BYTES);
+    }
+
+    @Test
+    public void kvBytesIntSchemaTests() {
+        Schema pulsarKvSchema = KeyValueSchemaImpl.of(Schema.STRING, Schema.INT64);
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarKvSchema);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.MAP);
+        assertEquals(kafkaSchema.keySchema().type(), org.apache.kafka.connect.data.Schema.Type.STRING);
+        assertEquals(kafkaSchema.valueSchema().type(), org.apache.kafka.connect.data.Schema.Type.INT64);
+    }
+
+    @Test
+    public void avroSchemaTest() {
+        AvroSchema<StructWithAnnotations> pulsarAvroSchema = AvroSchema.of(StructWithAnnotations.class);
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(pulsarAvroSchema);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT);
+        assertEquals(kafkaSchema.fields().size(), STRUCT_FIELDS.size());
+        for (String name: STRUCT_FIELDS) {
+            assertEquals(kafkaSchema.field(name).name(), name);
+        }
+    }
+
+    @Test
+    public void jsonSchemaTest() {
+        JSONSchema<StructWithAnnotations> jsonSchema = JSONSchema
+                .of(SchemaDefinition.<StructWithAnnotations>builder()
+                .withPojo(StructWithAnnotations.class)
+                .withAlwaysAllowNull(false)
+                .build());
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(jsonSchema);
+        assertEquals(kafkaSchema.type(), org.apache.kafka.connect.data.Schema.Type.STRUCT);
+        assertEquals(kafkaSchema.fields().size(), STRUCT_FIELDS.size());
+        for (String name: STRUCT_FIELDS) {
+            assertEquals(kafkaSchema.field(name).name(), name);
+        }
+    }
+
+    @Test
+    public void dateSchemaTest() {
+        org.apache.kafka.connect.data.Schema kafkaSchema =
+                PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.DATE);
+        assertEquals(kafkaSchema.type(), Date.SCHEMA.type());
+    }
+
+    // not supported schemas below:
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void timeSchemaTest() {
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIME);
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void timestampSchemaTest() {
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.TIMESTAMP);
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void instantSchemaTest() {
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.INSTANT);
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void localDateSchemaTest() {
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE);
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void localTimeSchemaTest() {
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_TIME);
+    }
+
+    @Test(expectedExceptions = IllegalStateException.class)
+    public void localDatetimeSchemaTest() {
+        PulsarSchemaToKafkaSchema.getKafkaConnectSchema(Schema.LOCAL_DATE_TIME);
+    }
+
+}

[pulsar] 02/08: [Issue 10860][pulsar-metadata] Ensure metadata cache consistency across brokers (#10862)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d280fad2b9fbfaba21ebd9d68aafcba36e10a005
Author: Surinder Singh <su...@gmail.com>
AuthorDate: Tue Jun 8 14:09:47 2021 -0700

    [Issue 10860][pulsar-metadata] Ensure metadata cache consistency across brokers (#10862)
    
    * Added metadata cache test to simulate multi broker cache
    
    * fix create and delete ops on cache
    
    1. During create we should add a watch for the path in zookeeper. Without this
    we will not be notified if the znode is changed on another brokers
    
    2. similarly when deleting, the cache should be invalidated. But we shouldn't add an
    entry to the cache. This could get added again on some other broker. In that
    case we need to go a fetch the entry from the zookeeper. Adding an empty
    entry to the cache prevents that.
    
    * Address review comments
    
    Also add a small delay in test to allow notifications to propagate to other
    caches. Without this the tests are occasionally failing
    
    Co-authored-by: Surinder Singh <su...@splunk.com>
    (cherry picked from commit 798b34fd2f4d4998ef2b7836dcd0d7f28883338b)
---
 .../metadata/cache/impl/MetadataCacheImpl.java     | 21 ++++---
 .../apache/pulsar/metadata/MetadataCacheTest.java  | 71 ++++++++++++++++++++++
 2 files changed, 85 insertions(+), 7 deletions(-)

diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
index 9ad563f..7af568d 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java
@@ -38,6 +38,7 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.pulsar.metadata.api.CacheGetResult;
 import org.apache.pulsar.metadata.api.MetadataCache;
@@ -50,6 +51,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.Notification;
 import org.apache.pulsar.metadata.api.Stat;
 
+@Slf4j
 public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notification> {
 
     private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5);
@@ -209,8 +211,17 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica
         store.put(path, content, Optional.of(-1L))
                 .thenAccept(stat -> {
                     // Make sure we have the value cached before the operation is completed
-                    objCache.put(path, FutureUtils.value(Optional.of(new CacheGetResult<>(value, stat))));
-                    future.complete(null);
+                    // In addition to caching the value, we need to add a watch on the path,
+                    // so when/if it changes on any other node, we are notified and we can
+                    // update the cache
+                    objCache.get(path).whenComplete( (stat2, ex) -> {
+                        if (ex == null) {
+                            future.complete(null);
+                        } else {
+                            log.error("Exception while getting path {}", path, ex);
+                            future.completeExceptionally(ex.getCause());
+                        }
+                    });
                 }).exceptionally(ex -> {
                     if (ex.getCause() instanceof BadVersionException) {
                         // Use already exists exception to provide more self-explanatory error message
@@ -226,11 +237,7 @@ public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notifica
 
     @Override
     public CompletableFuture<Void> delete(String path) {
-        return store.delete(path, Optional.empty())
-                .thenAccept(v -> {
-                    // Mark in the cache that the object was removed
-                    objCache.put(path, FutureUtils.value(Optional.empty()));
-                });
+        return store.delete(path, Optional.empty());
     }
 
     @Override
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
index f1b4379..4d83270 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataCacheTest.java
@@ -28,6 +28,8 @@ import static org.testng.Assert.fail;
 import com.fasterxml.jackson.core.type.TypeReference;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.TreeMap;
@@ -50,6 +52,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializat
 import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
 import org.apache.pulsar.metadata.api.MetadataStoreFactory;
 import org.apache.pulsar.metadata.api.Stat;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 public class MetadataCacheTest extends BaseMetadataStoreTest {
@@ -90,6 +93,74 @@ public class MetadataCacheTest extends BaseMetadataStoreTest {
         }
     }
 
+    @DataProvider(name = "zk")
+    public Object[][] zkimplementations() {
+        return new Object[][] {
+            { "ZooKeeper", zks.getConnectionString() },
+        };
+    }
+
+    @Test(dataProvider = "zk")
+    public void crossStoreUpdates(String provider, String url) throws Exception {
+        @Cleanup
+        MetadataStore store1 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());
+
+        @Cleanup
+        MetadataStore store2 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());
+
+        @Cleanup
+        MetadataStore store3 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());
+
+        MetadataCache<MyClass> objCache1 = store1.getMetadataCache(MyClass.class);
+        MetadataCache<MyClass> objCache2 = store2.getMetadataCache(MyClass.class);
+        MetadataCache<MyClass> objCache3 = store3.getMetadataCache(MyClass.class);
+
+        List<MetadataCache<MyClass>> allCaches = new ArrayList<>();
+        allCaches.add(objCache1);
+        allCaches.add(objCache2);
+        allCaches.add(objCache3);
+
+        // Add on one cache and remove from another
+        multiStoreAddDelete(allCaches, 0, 1, "add cache0 del cache1");
+        // retry same order to rule out any stale state
+        multiStoreAddDelete(allCaches, 0, 1, "add cache0 del cache1");
+        // Reverse the operations
+        multiStoreAddDelete(allCaches, 1, 0, "add cache1 del cache0");
+        // Ensure that working on same cache continues to work.
+        multiStoreAddDelete(allCaches, 1, 1, "add cache1 del cache1");
+    }
+
+    private void multiStoreAddDelete(List<MetadataCache<MyClass>> caches, int addOn, int delFrom, String testName) throws InterruptedException {
+        MetadataCache<MyClass> addCache = caches.get(addOn);
+        MetadataCache<MyClass> delCache = caches.get(delFrom);
+
+        String key1 = "/test-key1";
+        assertEquals(addCache.getIfCached(key1), Optional.empty());
+
+        MyClass value1 = new MyClass(testName, 1);
+
+        addCache.create(key1, value1).join();
+        // all time for changes to propagate to other caches
+        Thread.sleep(100);
+        for (MetadataCache<MyClass> cache: caches) {
+            if (cache == addCache) {
+                assertEquals(cache.getIfCached(key1), Optional.of(value1));
+            }
+            assertEquals(cache.get(key1).join(), Optional.of(value1));
+            assertEquals(cache.getIfCached(key1), Optional.of(value1));
+        }
+
+        delCache.delete(key1).join();
+
+        // all time for changes to propagate to other caches
+        Thread.sleep(100);
+        // The entry should get removed from all caches
+        for (MetadataCache<MyClass> cache: caches) {
+            assertEquals(cache.getIfCached(key1), Optional.empty());
+            assertEquals(cache.get(key1).join(), Optional.empty());
+        }
+    }
+
     @Test(dataProvider = "impl")
     public void insertionDeletionWitGenericType(String provider, String url) throws Exception {
         @Cleanup

[pulsar] 01/08: pulsar-perf: print total number of messages (#10765)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 263df0248ec339b92a3184f2efae4da79828c5c3
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Tue Jun 8 10:28:04 2021 +0200

    pulsar-perf: print total number of messages (#10765)
    
    
    (cherry picked from commit d93b775fb61af7c84d6960918bf9fa95b9b1d245)
---
 .../main/java/org/apache/pulsar/testclient/PerformanceConsumer.java | 6 +++++-
 .../main/java/org/apache/pulsar/testclient/PerformanceProducer.java | 6 ++++--
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 12cc29a..c755cc0 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.stats.JvmMetrics;
+import org.apache.pulsar.testclient.utils.PaddingDecimalFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +61,7 @@ import com.google.common.util.concurrent.RateLimiter;
 public class PerformanceConsumer {
     private static final LongAdder messagesReceived = new LongAdder();
     private static final LongAdder bytesReceived = new LongAdder();
+    private static final DecimalFormat intFormat = new PaddingDecimalFormat("0", 7);
     private static final DecimalFormat dec = new DecimalFormat("0.000");
 
     private static final LongAdder totalMessagesReceived = new LongAdder();
@@ -401,13 +403,15 @@ public class PerformanceConsumer {
 
             long now = System.nanoTime();
             double elapsed = (now - oldTime) / 1e9;
+            long total = totalMessagesReceived.sum();
             double rate = messagesReceived.sumThenReset() / elapsed;
             double throughput = bytesReceived.sumThenReset() / elapsed * 8 / 1024 / 1024;
 
             reportHistogram = recorder.getIntervalHistogram(reportHistogram);
 
             log.info(
-                    "Throughput received: {}  msg/s -- {} Mbit/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+                    "Throughput received: {} msg --- {}  msg/s -- {} Mbit/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+                    intFormat.format(total),
                     dec.format(rate), dec.format(throughput), dec.format(reportHistogram.getMean()),
                     reportHistogram.getValueAtPercentile(50), reportHistogram.getValueAtPercentile(95),
                     reportHistogram.getValueAtPercentile(99), reportHistogram.getValueAtPercentile(99.9),
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 846bc96..f198a2d 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -453,7 +453,7 @@ public class PerformanceProducer {
 
             long now = System.nanoTime();
             double elapsed = (now - oldTime) / 1e9;
-
+            long total = totalMessagesSent.sum();
             double rate = messagesSent.sumThenReset() / elapsed;
             double failureRate = messagesFailed.sumThenReset() / elapsed;
             double throughput = bytesSent.sumThenReset() / elapsed / 1024 / 1024 * 8;
@@ -461,7 +461,8 @@ public class PerformanceProducer {
             reportHistogram = recorder.getIntervalHistogram(reportHistogram);
 
             log.info(
-                    "Throughput produced: {}  msg/s --- {} Mbit/s --- failure {} msg/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+                    "Throughput produced: {} msg --- {} msg/s --- {} Mbit/s --- failure {} msg/s --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
+                    intFormat.format(total),
                     throughputFormat.format(rate), throughputFormat.format(throughput),
                     throughputFormat.format(failureRate),
                     dec.format(reportHistogram.getMean() / 1000.0),
@@ -714,6 +715,7 @@ public class PerformanceProducer {
 
     static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
     static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
+    static final DecimalFormat intFormat = new PaddingDecimalFormat("0", 7);
     static final DecimalFormat totalFormat = new DecimalFormat("0.000");
     private static final Logger log = LoggerFactory.getLogger(PerformanceProducer.class);
 

[pulsar] 06/08: Make KeyValueSchema an interface visible in the public Schema API (#10888)

Posted by pe...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1ecadd482efafed9c103d70cbddbe9ffb4931bec
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Thu Jun 10 11:56:38 2021 +0200

    Make KeyValueSchema an interface visible in the public Schema API (#10888)
    
    * Make KeyValueSchema an interface visible in the public Schema API
    - allow users of pulsar-client-api to use KeyValueSchema
    - move KeyValueSchema implementation to KeyValueSchemaImpl
    - introduce a new interface KeyValueSchema
    
    (cherry picked from commit 18f2f4a9c1dab7eec5c7c9590b76aca17ee1ead8)
---
 .../apache/pulsar/broker/service/KeyValueTest.java |  16 +--
 .../pulsar/broker/service/NullValueTest.java       |  10 +-
 .../KeyValueSchemaCompatibilityCheckTest.java      | 118 ++++++++++-----------
 .../apache/pulsar/client/api/SimpleSchemaTest.java |  16 +--
 .../java/org/apache/pulsar/schema/SchemaTest.java  |  12 +--
 .../pulsar/client/api/schema/KeyValueSchema.java   |  54 ++++++++++
 .../client/internal/DefaultImplementation.java     |   8 +-
 .../org/apache/pulsar/client/impl/MessageImpl.java |  12 +--
 .../client/impl/TypedMessageBuilderImpl.java       |  10 +-
 .../client/impl/schema/AutoConsumeSchema.java      |   2 +-
 .../client/impl/schema/AutoProduceBytesSchema.java |   6 +-
 ...KeyValueSchema.java => KeyValueSchemaImpl.java} |  63 ++++++++---
 .../client/impl/schema/KeyValueSchemaTest.java     |  48 ++++-----
 .../SupportVersioningKeyValueSchemaTest.java       |  12 +--
 .../impl/schema/generic/GenericSchemaImplTest.java |   6 +-
 .../impl/schema/generic/GenericSchemaTest.java     |   6 +-
 .../pulsar/functions/instance/SinkRecord.java      |   4 +-
 .../apache/pulsar/functions/sink/PulsarSink.java   |   2 +-
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  |   3 +-
 .../pulsar/sql/presto/TestPulsarRecordCursor.java  |   8 +-
 .../integration/io/TestGenericObjectSink.java      |   2 +-
 .../tests/integration/io/PulsarIOTestRunner.java   |   7 +-
 .../tests/integration/presto/TestBasicPresto.java  |   8 +-
 23 files changed, 250 insertions(+), 183 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java
index 0faee61..2796bbb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/KeyValueTest.java
@@ -20,33 +20,23 @@ package org.apache.pulsar.broker.service;
 
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageRouter;
-import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.TopicMetadata;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
 import org.apache.pulsar.client.api.schema.SchemaBuilder;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
 import org.apache.pulsar.common.schema.KeyValue;
-import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaType;
-import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import java.util.concurrent.CompletableFuture;
-
 import static org.testng.Assert.assertEquals;
 
 /**
@@ -83,7 +73,7 @@ public class KeyValueTest extends BrokerTestBase {
 
         @Cleanup
         Producer<KeyValue<GenericRecord, GenericRecord>> producer = pulsarClient
-                .newProducer(KeyValueSchema.of(schema, schema))
+                .newProducer(KeyValueSchemaImpl.of(schema, schema))
                 .topic(topic)
                 .create();
 
@@ -91,7 +81,7 @@ public class KeyValueTest extends BrokerTestBase {
 
         @Cleanup
         Consumer<KeyValue<GenericRecord, GenericRecord>> consumer = pulsarClient
-                .newConsumer(KeyValueSchema.of(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME()))
+                .newConsumer(KeyValueSchemaImpl.of(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME()))
                 .topic(topic)
                 .subscriptionName("test")
                 .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
index 8f7513e..4cb476e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NullValueTest.java
@@ -30,7 +30,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TopicMetadata;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.testng.Assert;
@@ -173,14 +173,14 @@ public class NullValueTest extends BrokerTestBase {
 
         @Cleanup
         Producer<KeyValue<String, String>> producer = pulsarClient
-                .newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
+                .newProducer(KeyValueSchemaImpl.of(Schema.STRING, Schema.STRING))
                 .topic(topic)
                 .messageRoutingMode(MessageRoutingMode.SinglePartition)
                 .create();
 
         @Cleanup
         Consumer<KeyValue<String, String>> consumer = pulsarClient
-                .newConsumer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
+                .newConsumer(KeyValueSchemaImpl.of(Schema.STRING, Schema.STRING))
                 .topic(topic)
                 .subscriptionName("test")
                 .subscribe();
@@ -220,7 +220,7 @@ public class NullValueTest extends BrokerTestBase {
 
         @Cleanup
         Producer<KeyValue<String, String>> producer = pulsarClient
-                .newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
+                .newProducer(KeyValueSchemaImpl.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
                 .topic(topic)
                 // The default SinglePartition routing mode will be affected by the key when the KeyValueEncodingType is
                 // SEPARATED so we need to define a message router to guarantee the message order.
@@ -234,7 +234,7 @@ public class NullValueTest extends BrokerTestBase {
 
         @Cleanup
         Consumer<KeyValue<String, String>> consumer = pulsarClient
-                .newConsumer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
+                .newConsumer(KeyValueSchemaImpl.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
                 .topic(topic)
                 .subscriptionName("test")
                 .subscribe();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java
index d54dfb8..78cb9af 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/KeyValueSchemaCompatibilityCheckTest.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.client.impl.schema.StringSchema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -67,9 +67,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
     }
 
@@ -81,9 +81,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
     }
 
@@ -95,9 +95,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
     }
 
@@ -109,9 +109,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
     }
 
@@ -123,9 +123,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
     }
 
@@ -137,9 +137,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
     }
 
@@ -151,9 +151,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
         properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
     }
 
@@ -165,9 +165,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
         properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
     }
 
@@ -179,9 +179,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
         properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
     }
 
@@ -193,9 +193,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
         properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
     }
 
@@ -207,9 +207,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
         properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
     }
 
@@ -221,9 +221,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
         properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
     }
 
@@ -235,9 +235,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
     }
 
@@ -249,9 +249,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
     }
 
@@ -263,9 +263,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
     }
 
@@ -277,9 +277,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
     }
 
@@ -291,9 +291,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
     }
 
@@ -305,9 +305,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         properties.put("value.schema.type", String.valueOf(SchemaType.JSON));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
     }
 
@@ -319,9 +319,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
         properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
     }
 
@@ -333,9 +333,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
         properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
     }
 
@@ -347,9 +347,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
         properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
     }
 
@@ -361,9 +361,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
         properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.BACKWARD));
     }
 
@@ -376,9 +376,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
         properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
     }
 
@@ -390,9 +390,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         properties.put("key.schema.type", String.valueOf(SchemaType.JSON));
         properties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(barSchema, fooSchema).getSchemaInfo().getSchema()).props(properties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(properties).build();
         Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
     }
 
@@ -407,9 +407,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         toProperties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         toProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(barSchema, barSchema).getSchemaInfo().getSchema()).props(toProperties).build();
+                .data(KeyValueSchemaImpl.of(barSchema, barSchema).getSchemaInfo().getSchema()).props(toProperties).build();
         Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
     }
 
@@ -424,9 +424,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         toProperties.put("key.schema.type", String.valueOf(SchemaType.JSON));
         toProperties.put("value.schema.type", String.valueOf(SchemaType.JSON));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, fooSchema).getSchemaInfo().getSchema()).props(toProperties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, fooSchema).getSchemaInfo().getSchema()).props(toProperties).build();
         Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FORWARD));
     }
 
@@ -441,9 +441,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         toProperties.put("key.schema.type", String.valueOf(SchemaType.AVRO));
         toProperties.put("value.schema.type", String.valueOf(SchemaType.AVRO));
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(toProperties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(toProperties).build();
         Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
     }
 
@@ -454,9 +454,9 @@ public class KeyValueSchemaCompatibilityCheckTest {
         Map<String, String> fromProperties = Maps.newHashMap();
         Map<String, String> toProperties = Maps.newHashMap();
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(fromProperties).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(toProperties).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).props(toProperties).build();
         Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.FULL));
     }
 
@@ -468,7 +468,7 @@ public class KeyValueSchemaCompatibilityCheckTest {
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.STRING)
                 .data(stringSchema.getSchemaInfo().getSchema()).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).build();
         Assert.assertTrue(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE));
     }
 
@@ -480,7 +480,7 @@ public class KeyValueSchemaCompatibilityCheckTest {
         SchemaData fromSchemaData = SchemaData.builder().type(SchemaType.STRING)
                 .data(stringSchema.getSchemaInfo().getSchema()).build();
         SchemaData toSchemaData = SchemaData.builder().type(SchemaType.KEY_VALUE)
-                .data(KeyValueSchema.of(fooSchema, barSchema).getSchemaInfo().getSchema()).build();
+                .data(KeyValueSchemaImpl.of(fooSchema, barSchema).getSchemaInfo().getSchema()).build();
         Assert.assertFalse(checkers.get(SchemaType.KEY_VALUE).isCompatible(fromSchemaData, toSchemaData, SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE));
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
index 16a16c3..57bde6b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java
@@ -36,7 +36,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.Schema.Parser;
 import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.common.schema.LongSchemaVersion;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
@@ -618,7 +618,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 assertEquals(data.getValue().getField("i"), i * 1000);
                 c0.acknowledge(wrapper);
                 Schema<?> schema = wrapper.getReaderSchema().get();
-                KeyValueSchema keyValueSchema = (KeyValueSchema) schema;
+                KeyValueSchemaImpl keyValueSchema = (KeyValueSchemaImpl) schema;
                 assertEquals(SchemaType.AVRO, keyValueSchema.getKeySchema().getSchemaInfo().getType());
                 assertEquals(SchemaType.AVRO, keyValueSchema.getValueSchema().getSchemaInfo().getType());
                 assertNotNull(schema.getSchemaInfo());
@@ -630,7 +630,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 assertEquals(data.getValue().getKey().getField("i"), i * 100);
                 assertEquals(data.getValue().getValue().getField("i"), i * 1000);
                 c1.acknowledge(data);
-                KeyValueSchema keyValueSchema = (KeyValueSchema) data.getReaderSchema().get();
+                KeyValueSchemaImpl keyValueSchema = (KeyValueSchemaImpl) data.getReaderSchema().get();
                 assertNotNull(keyValueSchema.getKeySchema());
                 assertNotNull(keyValueSchema.getValueSchema());
             }
@@ -642,7 +642,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 assertEquals(data.getValue().getKey().i, i * 100);
                 assertEquals(data.getValue().getValue().i, i * 1000);
                 c2.acknowledge(data);
-                KeyValueSchema keyValueSchema = (KeyValueSchema) data.getReaderSchema().get();
+                KeyValueSchemaImpl keyValueSchema = (KeyValueSchemaImpl) data.getReaderSchema().get();
                 assertNotNull(keyValueSchema.getKeySchema());
                 assertNotNull(keyValueSchema.getValueSchema());
             }
@@ -654,7 +654,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 assertEquals(data.getValue().getKey().getField("i"), i * 100);
                 assertEquals(data.getValue().getValue().i, i * 1000);
                 c3.acknowledge(data);
-                KeyValueSchema keyValueSchema = (KeyValueSchema) data.getReaderSchema().get();
+                KeyValueSchemaImpl keyValueSchema = (KeyValueSchemaImpl) data.getReaderSchema().get();
                 assertNotNull(keyValueSchema.getKeySchema());
                 assertNotNull(keyValueSchema.getValueSchema());
             }
@@ -812,7 +812,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                 assertEquals(data.getKey().getField("i"), i * 100);
                 assertEquals(data.getValue().getField("i"), i * 1000);
                 c0.acknowledge(wrapper);
-                KeyValueSchema keyValueSchema = (KeyValueSchema) wrapper.getReaderSchema().get();
+                KeyValueSchemaImpl keyValueSchema = (KeyValueSchemaImpl) wrapper.getReaderSchema().get();
                 assertNotNull(keyValueSchema.getKeySchema());
                 assertNotNull(keyValueSchema.getValueSchema());
                 assertTrue(keyValueSchema.getKeySchema().getSchemaInfo().getSchemaDefinition().contains("V1Data"));
@@ -848,7 +848,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
                     assertEquals(data.getValue().getField("i"), i * 1000);
                     assertEquals(data.getKey().getField("j"), i);
                     assertEquals(data.getValue().getField("j"), i * 20);
-                    KeyValueSchema keyValueSchema = (KeyValueSchema) wrapper.getReaderSchema().get();
+                    KeyValueSchemaImpl keyValueSchema = (KeyValueSchemaImpl) wrapper.getReaderSchema().get();
                     assertNotNull(keyValueSchema.getKeySchema());
                     assertNotNull(keyValueSchema.getValueSchema());
                     assertTrue(keyValueSchema.getKeySchema().getSchemaInfo().getSchemaDefinition().contains("V2Data"));
@@ -987,7 +987,7 @@ public class SimpleSchemaTest extends ProducerConsumerBase {
             p.send(new KeyValue<>(null, null));
 
             Message<GenericRecord> wrapper = c0.receive();
-            assertEquals(encodingType, ((KeyValueSchema) wrapper.getReaderSchema().get()).getKeyValueEncodingType());
+            assertEquals(encodingType, ((KeyValueSchemaImpl) wrapper.getReaderSchema().get()).getKeyValueEncodingType());
             KeyValue<GenericRecord, GenericRecord> data1 = (KeyValue<GenericRecord, GenericRecord>) wrapper.getValue().getNativeObject();
             assertEquals(1, data1.getKey().getField("i"));
             assertEquals(2, data1.getValue().getField("i"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 62f18ec..d97b989 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -60,15 +60,13 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.TenantInfo;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
@@ -280,7 +278,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         Schemas.PersonTwo personConsume = message.getValue().getValue();
         assertEquals(personConsume.getName(), "Tom");
         assertEquals(personConsume.getId(), 1);
-        KeyValueSchema schema = (KeyValueSchema) message.getReaderSchema().get();
+        KeyValueSchemaImpl schema = (KeyValueSchemaImpl) message.getReaderSchema().get();
         log.info("the-schema {}", schema);
         assertEquals(personTwoSchema.getSchemaInfo(), schema.getValueSchema().getSchemaInfo());
         org.apache.avro.Schema nativeSchema = (org.apache.avro.Schema) schema.getValueSchema().getNativeSchema().get();
@@ -290,7 +288,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
         // verify that with AUTO_CONSUME we can access the original schema
         // and the Native AVRO schema
         Message<?> message2 = consumer2.receive();
-        KeyValueSchema schema2 = (KeyValueSchema) message2.getReaderSchema().get();
+        KeyValueSchemaImpl schema2 = (KeyValueSchemaImpl) message2.getReaderSchema().get();
         log.info("the-schema {}", schema2);
         assertEquals(personTwoSchema.getSchemaInfo(), schema2.getValueSchema().getSchemaInfo());
         org.apache.avro.Schema nativeSchema2 = (org.apache.avro.Schema) schema.getValueSchema().getNativeSchema().get();
@@ -608,8 +606,8 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
 
         Schema<?> schema = message.getReaderSchema().get();
         Schema<?> schemaFromGenericRecord = message.getReaderSchema().get();
-        KeyValueSchema keyValueSchema = (KeyValueSchema) schema;
-        KeyValueSchema keyValueSchemaFromGenericRecord = (KeyValueSchema) schemaFromGenericRecord;
+        KeyValueSchemaImpl keyValueSchema = (KeyValueSchemaImpl) schema;
+        KeyValueSchemaImpl keyValueSchemaFromGenericRecord = (KeyValueSchemaImpl) schemaFromGenericRecord;
         assertEquals(keyValueSchema.getSchemaInfo(), keyValueSchemaFromGenericRecord.getSchemaInfo());
 
         if (keyValueEncodingType == KeyValueEncodingType.SEPARATED) {
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/KeyValueSchema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/KeyValueSchema.java
new file mode 100644
index 0000000..64da4af
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/schema/KeyValueSchema.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.schema;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+
+/**
+ * This interface models a Schema that is composed of two parts.
+ * A Key and a Value.
+ * @param <K> the type of the Key
+ * @param <V> the type of the Value.
+ */
+public interface KeyValueSchema<K,V> extends Schema<KeyValue<K,V>> {
+
+    /**
+     * Get the Schema of the Key.
+     * @return the Schema of the Key
+     */
+    Schema<K> getKeySchema();
+
+    /**
+     * Get the Schema of the Value.
+     *
+     * @return the Schema of the Value
+     */
+    Schema<V> getValueSchema();
+
+    /**
+     * Get the KeyValueEncodingType.
+     *
+     * @return the KeyValueEncodingType
+     * @see KeyValueEncodingType#INLINE
+     * @see KeyValueEncodingType#SEPARATED
+     */
+    KeyValueEncodingType getKeyValueEncodingType();
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
index dcee6a3..c0fa4d6 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/DefaultImplementation.java
@@ -301,28 +301,28 @@ public class DefaultImplementation {
     public static Schema<KeyValue<byte[], byte[]>> newKeyValueBytesSchema() {
         return catchExceptions(
                 () -> (Schema<KeyValue<byte[], byte[]>>) getStaticMethod(
-                    "org.apache.pulsar.client.impl.schema.KeyValueSchema",
+                    "org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl",
                         "kvBytes").invoke(null));
     }
 
     public static <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K> keySchema, Schema<V> valueSchema) {
         return catchExceptions(
                 () -> (Schema<KeyValue<K, V>>) getStaticMethod(
-                    "org.apache.pulsar.client.impl.schema.KeyValueSchema",
+                    "org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl",
                         "of", Schema.class, Schema.class).invoke(null, keySchema, valueSchema));
     }
 
     public static <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Schema<K> keySchema, Schema<V> valueSchema,
                                                                   KeyValueEncodingType keyValueEncodingType) {
         return catchExceptions(
-                () -> (Schema<KeyValue<K, V>>) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchema",
+                () -> (Schema<KeyValue<K, V>>) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl",
                         "of", Schema.class, Schema.class, KeyValueEncodingType.class)
                         .invoke(null, keySchema, valueSchema, keyValueEncodingType));
     }
 
     public static <K, V> Schema<KeyValue<K, V>> newKeyValueSchema(Class<K> key, Class<V> value, SchemaType type) {
         return catchExceptions(
-                () -> (Schema<KeyValue<K, V>>) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchema",
+                () -> (Schema<KeyValue<K, V>>) getStaticMethod("org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl",
                         "of", Class.class, Class.class, SchemaType.class).invoke(null, key, value, type));
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 48d0464..efb66e5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -43,7 +43,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.AbstractSchema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
 import org.apache.pulsar.common.api.proto.KeyValue;
@@ -450,11 +450,11 @@ public class MessageImpl<T> implements Message<T> {
     }
 
 
-    private KeyValueSchema getKeyValueSchema() {
+    private KeyValueSchemaImpl getKeyValueSchema() {
         if (schema instanceof AutoConsumeSchema) {
-            return (KeyValueSchema) ((AutoConsumeSchema) schema).getInternalSchema(getSchemaVersion());
+            return (KeyValueSchemaImpl) ((AutoConsumeSchema) schema).getInternalSchema(getSchemaVersion());
         } else {
-            return (KeyValueSchema) schema;
+            return (KeyValueSchemaImpl) schema;
         }
     }
 
@@ -472,7 +472,7 @@ public class MessageImpl<T> implements Message<T> {
     }
     
     private T getKeyValueBySchemaVersion() {
-        KeyValueSchema kvSchema = getKeyValueSchema();
+        KeyValueSchemaImpl kvSchema = getKeyValueSchema();
         byte[] schemaVersion = getSchemaVersion();
         if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
             org.apache.pulsar.common.schema.KeyValue keyValue =
@@ -489,7 +489,7 @@ public class MessageImpl<T> implements Message<T> {
     }
 
     private T getKeyValue() {
-        KeyValueSchema kvSchema = getKeyValueSchema();
+        KeyValueSchemaImpl kvSchema = getKeyValueSchema();
         if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
             org.apache.pulsar.common.schema.KeyValue keyValue =
                     (org.apache.pulsar.common.schema.KeyValue) kvSchema.decode(getKeyBytes(), getData(), null);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 565502c..51dc992 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -35,7 +35,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
@@ -108,7 +108,7 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
     @Override
     public TypedMessageBuilder<T> key(String key) {
         if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
-            KeyValueSchema kvSchema = (KeyValueSchema) schema;
+            KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
             checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
                     "This method is not allowed to set keys when in encoding type is SEPARATED");
             if (key == null) {
@@ -123,8 +123,8 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
 
     @Override
     public TypedMessageBuilder<T> keyBytes(byte[] key) {
-        if (schema instanceof KeyValueSchema && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
-            KeyValueSchema kvSchema = (KeyValueSchema) schema;
+        if (schema instanceof KeyValueSchemaImpl && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
+            KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
             checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
                     "This method is not allowed to set keys when in encoding type is SEPARATED");
             if (key == null) {
@@ -151,7 +151,7 @@ public class TypedMessageBuilderImpl<T> implements TypedMessageBuilder<T> {
         }
         if (value instanceof org.apache.pulsar.common.schema.KeyValue
                 && schema.getSchemaInfo() != null && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
-            KeyValueSchema kvSchema = (KeyValueSchema) schema;
+            KeyValueSchemaImpl kvSchema = (KeyValueSchemaImpl) schema;
             org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value;
             if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
                 // set key as the message key
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
index 2ac31fc..8c63133 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java
@@ -225,7 +225,7 @@ public class AutoConsumeSchema implements Schema<GenericRecord> {
                         KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo);
                 Schema<?> keySchema = getSchema(kvSchemaInfo.getKey());
                 Schema<?> valueSchema = getSchema(kvSchemaInfo.getValue());
-                return KeyValueSchema.of(keySchema, valueSchema,
+                return KeyValueSchemaImpl.of(keySchema, valueSchema,
                         KeyValueSchemaInfo.decodeKeyValueEncodingType(schemaInfo));
             default:
                 throw new IllegalArgumentException("Retrieve schema instance from schema info for type '"
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
index a85aceb..3db9554 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java
@@ -74,9 +74,9 @@ public class AutoProduceBytesSchema<T> implements Schema<byte[]> {
 
         if (requireSchemaValidation) {
             // verify if the message can be decoded by the underlying schema
-            if (schema instanceof KeyValueSchema
-                    && ((KeyValueSchema) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
-                ((KeyValueSchema) schema).getValueSchema().validate(message);
+            if (schema instanceof KeyValueSchemaImpl
+                    && ((KeyValueSchemaImpl) schema).getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
+                ((KeyValueSchemaImpl) schema).getValueSchema().validate(message);
             } else {
                 schema.validate(message);
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java
similarity index 85%
rename from pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
rename to pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java
index c33de77..b05a78f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaImpl.java
@@ -24,8 +24,8 @@ import java.util.concurrent.CompletableFuture;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
@@ -38,14 +38,13 @@ import org.apache.pulsar.common.schema.SchemaType;
  * [Key, Value] pair schema definition
  */
 @Slf4j
-public class KeyValueSchema<K, V> extends AbstractSchema<KeyValue<K, V>> {
+public class KeyValueSchemaImpl<K, V> extends AbstractSchema<KeyValue<K, V>> implements KeyValueSchema<K,V> {
+
 
-    @Getter
     private final Schema<K> keySchema;
-    @Getter
+
     private final Schema<V> valueSchema;
 
-    @Getter
     private final KeyValueEncodingType keyValueEncodingType;
 
     // schemaInfo combined by KeySchemaInfo and ValueSchemaInfo:
@@ -59,25 +58,25 @@ public class KeyValueSchema<K, V> extends AbstractSchema<KeyValue<K, V>> {
     public static <K, V> Schema<KeyValue<K, V>> of(Class<K> key, Class<V> value, SchemaType type) {
         checkArgument(SchemaType.JSON == type || SchemaType.AVRO == type);
         if (SchemaType.JSON == type) {
-            return new KeyValueSchema<>(JSONSchema.of(key), JSONSchema.of(value), KeyValueEncodingType.INLINE);
+            return new KeyValueSchemaImpl<>(JSONSchema.of(key), JSONSchema.of(value), KeyValueEncodingType.INLINE);
         } else {
             // AVRO
-            return new KeyValueSchema<>(AvroSchema.of(key), AvroSchema.of(value), KeyValueEncodingType.INLINE);
+            return new KeyValueSchemaImpl<>(AvroSchema.of(key), AvroSchema.of(value), KeyValueEncodingType.INLINE);
         }
     }
 
 
     public static <K, V> Schema<KeyValue<K, V>> of(Schema<K> keySchema, Schema<V> valueSchema) {
-        return new KeyValueSchema<>(keySchema, valueSchema, KeyValueEncodingType.INLINE);
+        return new KeyValueSchemaImpl<>(keySchema, valueSchema, KeyValueEncodingType.INLINE);
     }
 
     public static <K, V> Schema<KeyValue<K, V>> of(Schema<K> keySchema,
                                                    Schema<V> valueSchema,
                                                    KeyValueEncodingType keyValueEncodingType) {
-        return new KeyValueSchema<>(keySchema, valueSchema, keyValueEncodingType);
+        return new KeyValueSchemaImpl<>(keySchema, valueSchema, keyValueEncodingType);
     }
 
-    private static final Schema<KeyValue<byte[], byte[]>> KV_BYTES = new KeyValueSchema<>(
+    private static final Schema<KeyValue<byte[], byte[]>> KV_BYTES = new KeyValueSchemaImpl<>(
         BytesSchema.of(),
         BytesSchema.of());
 
@@ -90,14 +89,14 @@ public class KeyValueSchema<K, V> extends AbstractSchema<KeyValue<K, V>> {
         return keySchema.supportSchemaVersioning() || valueSchema.supportSchemaVersioning();
     }
 
-    private KeyValueSchema(Schema<K> keySchema,
-                           Schema<V> valueSchema) {
+    private KeyValueSchemaImpl(Schema<K> keySchema,
+                               Schema<V> valueSchema) {
         this(keySchema, valueSchema, KeyValueEncodingType.INLINE);
     }
 
-    private KeyValueSchema(Schema<K> keySchema,
-                           Schema<V> valueSchema,
-                           KeyValueEncodingType keyValueEncodingType) {
+    private KeyValueSchemaImpl(Schema<K> keySchema,
+                               Schema<V> valueSchema,
+                               KeyValueEncodingType keyValueEncodingType) {
         this.keySchema = keySchema;
         this.valueSchema = valueSchema;
         this.keyValueEncodingType = keyValueEncodingType;
@@ -223,7 +222,7 @@ public class KeyValueSchema<K, V> extends AbstractSchema<KeyValue<K, V>> {
 
     @Override
     public Schema<KeyValue<K, V>> clone() {
-        return KeyValueSchema.of(keySchema.clone(), valueSchema.clone(), keyValueEncodingType);
+        return KeyValueSchemaImpl.of(keySchema.clone(), valueSchema.clone(), keyValueEncodingType);
     }
 
     private void buildKeyValueSchemaInfo() {
@@ -285,8 +284,38 @@ public class KeyValueSchema<K, V> extends AbstractSchema<KeyValue<K, V>> {
         } else {
             Schema<?> keySchema = this.keySchema instanceof AbstractSchema ? ((AbstractSchema) this.keySchema).atSchemaVersion(schemaVersion) : this.keySchema;
             Schema<?> valueSchema = this.valueSchema instanceof AbstractSchema ? ((AbstractSchema) this.valueSchema).atSchemaVersion(schemaVersion) : this.valueSchema;
-            return KeyValueSchema.of(keySchema, valueSchema, keyValueEncodingType);
+            return KeyValueSchemaImpl.of(keySchema, valueSchema, keyValueEncodingType);
         }
     }
 
+    /**
+     * Get the Schema of the Key.
+     * @return the Schema of the Key
+     */
+    @Override
+    public Schema<K> getKeySchema() {
+        return keySchema;
+    }
+
+    /**
+     * Get the Schema of the Value.
+     *
+     * @return the Schema of the Value
+     */
+    @Override
+    public Schema<V> getValueSchema() {
+        return valueSchema;
+    }
+
+    /**
+     * Get the KeyValueEncodingType.
+     *
+     * @return the KeyValueEncodingType
+     * @see KeyValueEncodingType#INLINE
+     * @see KeyValueEncodingType#SEPARATED
+     */
+    @Override
+    public KeyValueEncodingType getKeyValueEncodingType() {
+        return keyValueEncodingType;
+    }
 }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
index 4692290..b0a86c2 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaTest.java
@@ -51,13 +51,13 @@ public class KeyValueSchemaTest {
         assertEquals(keyValueSchema1.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
         assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
 
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
                 SchemaType.AVRO);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
                 SchemaType.AVRO);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
                 SchemaType.AVRO);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
                 SchemaType.AVRO);
 
         String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
@@ -103,13 +103,13 @@ public class KeyValueSchemaTest {
         assertEquals(keyValueSchema1.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
         assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
 
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
             SchemaType.AVRO);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
             SchemaType.AVRO);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
             SchemaType.AVRO);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
             SchemaType.AVRO);
 
         String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
@@ -130,17 +130,17 @@ public class KeyValueSchemaTest {
         assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
         assertEquals(keyValueSchema3.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
 
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
                 SchemaType.JSON);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
                 SchemaType.JSON);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
                 SchemaType.JSON);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
                 SchemaType.JSON);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
                 SchemaType.JSON);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
                 SchemaType.JSON);
 
         String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
@@ -168,17 +168,17 @@ public class KeyValueSchemaTest {
         assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
         assertEquals(keyValueSchema3.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
 
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
             SchemaType.JSON);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
             SchemaType.JSON);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
             SchemaType.JSON);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
             SchemaType.JSON);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
             SchemaType.JSON);
-        assertEquals(((KeyValueSchema<Foo, Bar>) keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
+        assertEquals(((KeyValueSchemaImpl<Foo, Bar>) keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
             SchemaType.JSON);
 
         String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
@@ -324,7 +324,7 @@ public class KeyValueSchemaTest {
         } catch (SchemaSerializationException e) {
             Assert.assertTrue(e.getMessage().contains("This method cannot be used under this SEPARATED encoding type"));
         }
-        KeyValue<Foo, Bar>  keyValue = ((KeyValueSchema)keyValueSchema).decode(fooSchema.encode(foo), encodeBytes, null);
+        KeyValue<Foo, Bar>  keyValue = ((KeyValueSchemaImpl)keyValueSchema).decode(fooSchema.encode(foo), encodeBytes, null);
         Foo fooBack = keyValue.getKey();
         Bar barBack = keyValue.getValue();
         assertEquals(foo, fooBack);
@@ -391,9 +391,9 @@ public class KeyValueSchemaTest {
 
     @Test
     public void testKeyValueSchemaSeparatedEncoding() {
-        KeyValueSchema<String, String> keyValueSchema = (KeyValueSchema<String,String>)
-                KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED);
-        KeyValueSchema<String, String> keyValueSchema2 = (KeyValueSchema<String,String>)
+        KeyValueSchemaImpl<String, String> keyValueSchema = (KeyValueSchemaImpl<String,String>)
+                KeyValueSchemaImpl.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED);
+        KeyValueSchemaImpl<String, String> keyValueSchema2 = (KeyValueSchemaImpl<String,String>)
                 AutoConsumeSchema.getSchema(keyValueSchema.getSchemaInfo());
         assertEquals(keyValueSchema.getKeyValueEncodingType(), keyValueSchema2.getKeyValueEncodingType());
     }
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
index 40f879e..c40fcf0 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/SupportVersioningKeyValueSchemaTest.java
@@ -40,7 +40,7 @@ public class SupportVersioningKeyValueSchemaTest {
                 SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
         AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
                 SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
-        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchema.of(
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchemaImpl.of(
                 fooSchema, barSchema);
         keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
 
@@ -78,7 +78,7 @@ public class SupportVersioningKeyValueSchemaTest {
                 SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
         AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
                 SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
-        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchema.of(
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchemaImpl.of(
                 fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
         keyValueSchema.setSchemaInfoProvider(multiVersionSchemaInfoProvider);
 
@@ -96,7 +96,7 @@ public class SupportVersioningKeyValueSchemaTest {
         foo.setColor(SchemaTestUtils.Color.RED);
 
         byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
-        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = ((KeyValueSchema)keyValueSchema).decode(
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = ((KeyValueSchemaImpl)keyValueSchema).decode(
                 fooSchema.encode(foo), encodeBytes, new byte[10]);
         Assert.assertTrue(keyValue.getValue().isField1());
         Assert.assertEquals(
@@ -110,7 +110,7 @@ public class SupportVersioningKeyValueSchemaTest {
                 SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
         AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
                 SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
-        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchema.of(
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchemaImpl.of(
                 fooSchema, barSchema);
 
         SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
@@ -143,7 +143,7 @@ public class SupportVersioningKeyValueSchemaTest {
                 SchemaDefinition.<SchemaTestUtils.Foo>builder().withPojo(SchemaTestUtils.Foo.class).build());
         AvroSchema<SchemaTestUtils.Bar> barSchema = AvroSchema.of(
                 SchemaDefinition.<SchemaTestUtils.Bar>builder().withPojo(SchemaTestUtils.Bar.class).build());
-        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchema.of(
+        Schema<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> keyValueSchema = KeyValueSchemaImpl.of(
                 fooSchema, barSchema, KeyValueEncodingType.SEPARATED);
 
         SchemaTestUtils.Bar bar = new SchemaTestUtils.Bar();
@@ -157,7 +157,7 @@ public class SupportVersioningKeyValueSchemaTest {
         foo.setColor(SchemaTestUtils.Color.RED);
 
         byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
-        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = ((KeyValueSchema)keyValueSchema).decode(
+        KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = ((KeyValueSchemaImpl)keyValueSchema).decode(
                 fooSchema.encode(foo), encodeBytes, new byte[10]);
         Assert.assertTrue(keyValue.getValue().isField1());
         Assert.assertEquals(
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
index e2fc5d3..bcb5f92 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImplTest.java
@@ -33,7 +33,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
@@ -133,12 +133,12 @@ public class GenericSchemaImplTest {
         for (Schema<Foo> keySchema : encodeSchemas) {
             for (Schema<Foo> valueSchema : encodeSchemas) {
                 // configure encode schema
-                Schema<KeyValue<Foo, Foo>> kvSchema = KeyValueSchema.of(
+                Schema<KeyValue<Foo, Foo>> kvSchema = KeyValueSchemaImpl.of(
                     keySchema, valueSchema
                 );
 
                 // configure decode schema
-                Schema<KeyValue<GenericRecord, GenericRecord>> decodeSchema = KeyValueSchema.of(
+                Schema<KeyValue<GenericRecord, GenericRecord>> decodeSchema = KeyValueSchemaImpl.of(
                     Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME()
                 );
                 decodeSchema.configureSchemaInfo(
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
index 6e3c942..b1b4eaa 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaTest.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
 import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Foo;
@@ -133,12 +133,12 @@ public class GenericSchemaTest {
         for (Schema<Foo> keySchema : encodeSchemas) {
             for (Schema<Foo> valueSchema : encodeSchemas) {
                 // configure encode schema
-                Schema<KeyValue<Foo, Foo>> kvSchema = KeyValueSchema.of(
+                Schema<KeyValue<Foo, Foo>> kvSchema = KeyValueSchemaImpl.of(
                     keySchema, valueSchema
                 );
 
                 // configure decode schema
-                Schema<KeyValue<GenericRecord, GenericRecord>> decodeSchema = KeyValueSchema.of(
+                Schema<KeyValue<GenericRecord, GenericRecord>> decodeSchema = KeyValueSchemaImpl.of(
                     Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME()
                 );
                 decodeSchema.configureSchemaInfo(
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index 83b3689..d0ec4d6 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -26,7 +26,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.functions.api.KVRecord;
 import org.apache.pulsar.functions.api.Record;
 
@@ -119,7 +119,7 @@ public class SinkRecord<T> implements Record<T> {
 
         if (sourceRecord instanceof KVRecord) {
             KVRecord kvRecord = (KVRecord) sourceRecord;
-            return KeyValueSchema.of(kvRecord.getKeySchema(), kvRecord.getValueSchema(),
+            return KeyValueSchemaImpl.of(kvRecord.getKeySchema(), kvRecord.getValueSchema(),
                     kvRecord.getKeyValueEncodingType());
         }
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 552aa9706..5bc43a2 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -49,8 +49,8 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.CryptoConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index e9d8ec9..a75f1ee 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -34,10 +34,9 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
-import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.GenericObject;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
index a1eb2a3..7c56a4b 100644
--- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
+++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarRecordCursor.java
@@ -35,7 +35,7 @@ import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
@@ -153,7 +153,7 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
         for (KeyValueEncodingType encodingType :
                 Arrays.asList(KeyValueEncodingType.INLINE, KeyValueEncodingType.SEPARATED)) {
 
-            KeyValueSchema schema = (KeyValueSchema) Schema.KeyValue(Schema.JSON(Foo.class), Schema.AVRO(Boo.class),
+            KeyValueSchemaImpl schema = (KeyValueSchemaImpl) Schema.KeyValue(Schema.JSON(Foo.class), Schema.AVRO(Boo.class),
                     encodingType);
 
             Foo foo = new Foo();
@@ -218,7 +218,7 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
         for (KeyValueEncodingType encodingType :
                 Arrays.asList(KeyValueEncodingType.INLINE, KeyValueEncodingType.SEPARATED)) {
 
-            KeyValueSchema schema = (KeyValueSchema) Schema.KeyValue(Schema.INT32, Schema.STRING,
+            KeyValueSchemaImpl schema = (KeyValueSchemaImpl) Schema.KeyValue(Schema.INT32, Schema.STRING,
                     encodingType);
 
             String value = "primitive_message_value";
@@ -271,7 +271,7 @@ public class TestPulsarRecordCursor extends TestPulsarConnector {
      * @throws Exception
      */
     private PulsarRecordCursor mockKeyValueSchemaPulsarRecordCursor(final Long entriesNum, final TopicName topicName,
-                                                                    final KeyValueSchema schema, KeyValue message, List<PulsarColumnHandle> ColumnHandles) throws Exception {
+                                                                    final KeyValueSchemaImpl schema, KeyValue message, List<PulsarColumnHandle> ColumnHandles) throws Exception {
 
         ManagedLedgerFactory managedLedgerFactory = mock(ManagedLedgerFactory.class);
 
diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
index 63b3bac..d131e5b 100644
--- a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
+++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.tests.integration.io;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.schema.GenericObject;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestRunner.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestRunner.java
index cb9ffe4..0cc4ef5 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestRunner.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestRunner.java
@@ -21,14 +21,13 @@ package org.apache.pulsar.tests.integration.io;
 import java.time.Duration;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 
@@ -59,9 +58,9 @@ public abstract class PulsarIOTestRunner {
     @SuppressWarnings("rawtypes")
 	protected Schema getSchema(boolean jsonWithEnvelope) {
         if (jsonWithEnvelope) {
-            return KeyValueSchema.kvBytes();
+            return KeyValueSchemaImpl.kvBytes();
         } else {
-            return KeyValueSchema.of(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED);
+            return KeyValueSchemaImpl.of(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED);
         }
     }
     
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index eb9db59..9104390 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -32,7 +32,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
-import org.apache.pulsar.client.impl.schema.KeyValueSchema;
+import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.KeyValue;
@@ -41,8 +41,6 @@ import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.tests.integration.docker.ContainerExecException;
 import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
 import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -104,7 +102,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
             schemaFlag = schema.getSchemaInfo().getType().name();
         } else if(schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) {
             schemaFlag = schema.getSchemaInfo().getType().name() + "_"
-                    + ((KeyValueSchema) schema).getKeyValueEncodingType();
+                    + ((KeyValueSchemaImpl) schema).getKeyValueEncodingType();
         } else {
             // Because some schema types are same(such as BYTES and BYTEBUFFER), so use the schema name as flag.
             schemaFlag = schema.getSchemaInfo().getName();
@@ -313,7 +311,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
             case KEY_VALUE:
                 validateContentForKeyValueSchema(messageNum, contentArr);
                 log.info("finish validate content for KEY_VALUE {} schema type.",
-                        ((KeyValueSchema) schema).getKeyValueEncodingType());
+                        ((KeyValueSchemaImpl) schema).getKeyValueEncodingType());
         }
     }