You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ur...@apache.org on 2022/12/14 01:36:53 UTC

[pulsar-site] branch main updated: Docs sync done from apache/pulsar (#fdba644)

This is an automated email from the ASF dual-hosted git repository.

urfree pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
     new 5de9e1782ae Docs sync done from apache/pulsar (#fdba644)
5de9e1782ae is described below

commit 5de9e1782aef0ffc21da6ff47f2ca443058683e6
Author: github-actions[bot] <41...@users.noreply.github.com>
AuthorDate: Wed Dec 14 01:36:48 2022 +0000

    Docs sync done from apache/pulsar (#fdba644)
---
 site2/website-next/docs/admin-api-schemas.md       | 426 +++++-----
 site2/website-next/docs/client-libraries-python.md | 173 +---
 .../docs/schema-evolution-compatibility.md         | 168 +---
 site2/website-next/docs/schema-get-started.md      | 909 ++++++++++++++++-----
 site2/website-next/docs/schema-overview.md         | 114 ++-
 site2/website-next/docs/schema-understand.md       | 364 ++++++---
 site2/website-next/sidebars.json                   |   3 +-
 .../website-next/static/assets/schema-consumer.svg |   1 +
 .../website-next/static/assets/schema-producer.svg |   1 +
 site2/website-next/static/assets/schema.svg        |   1 +
 10 files changed, 1256 insertions(+), 904 deletions(-)

diff --git a/site2/website-next/docs/admin-api-schemas.md b/site2/website-next/docs/admin-api-schemas.md
index 609135e1a44..77326c3a9c8 100644
--- a/site2/website-next/docs/admin-api-schemas.md
+++ b/site2/website-next/docs/admin-api-schemas.md
@@ -14,7 +14,7 @@ import TabItem from '@theme/TabItem';
 
 This page only shows **some frequently used operations**.
 
-- For the latest and complete information about `Pulsar admin`, including commands, flags, descriptions, and more, see [Pulsar admin doc](/tools/pulsar-admin/)
+- For the latest and complete information about `Pulsar admin`, including commands, flags, descriptions, and more, see [Pulsar admin doc](/tools/pulsar-admin/).
 
 - For the latest and complete information about `REST API`, including parameters, responses, samples, and more, see {@inject: rest:REST:/} API doc.
 
@@ -22,53 +22,7 @@ This page only shows **some frequently used operations**.
 
 :::
 
-## Manage AutoUpdate strategy
-
-### Enable AutoUpdate
-
-To enable `AutoUpdate` on a namespace, you can use the `pulsar-admin` command.
-
-```bash
-bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace
-```
-
-### Disable AutoUpdate 
-
-To disable `AutoUpdate` on a namespace, you can use the `pulsar-admin` command.
-
-```bash
-bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace
-```
-
-Once the `AutoUpdate` is disabled, you can only register a new schema using the `pulsar-admin` command.
-
-### Adjust compatibility
-
-To adjust the schema compatibility level on a namespace, you can use the `pulsar-admin` command.
-
-```bash
-bin/pulsar-admin namespaces set-schema-compatibility-strategy --compatibility <compatibility-level> tenant/namespace
-```
-
-## Schema validation
-
-### Enable schema validation
-
-To enable `schemaValidationEnforced` on a namespace, you can use the `pulsar-admin` command.
-
-```bash
-bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace
-```
-
-### Disable schema validation
-
-To disable `schemaValidationEnforced` on a namespace, you can use the `pulsar-admin` command.
-
-```bash
-bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace
-```
-
-## Schema manual management
+## Manage schema
 
 ### Upload a schema
 
@@ -97,17 +51,9 @@ The `schema-definition-file` is in JSON format.
 }
 ```
 
-The `schema-definition-file` includes the following fields:
-
-| Field |  Description | 
-| --- | --- |
-|  `type`  |   The schema type. | 
-|  `schema`  |   The schema definition data, which is encoded in UTF 8 charset. <li>If the schema is a **primitive** schema, this field should be blank. </li><li>If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition. </li> | 
-|  `properties`  |  The additional properties associated with the schema. | 
+The following is an example of the `schema-definition-file` for a JSON schema.
 
-Here are examples of the `schema-definition-file` for a JSON schema.
-
-**Example 1**
+**Example**
 
 ```json
 {
@@ -117,18 +63,6 @@ Here are examples of the `schema-definition-file` for a JSON schema.
 }
 ```
 
-**Example 2**
-
-```json
-{
-    "type": "STRING",
-    "schema": "",
-    "properties": {
-        "key1": "value1"
-    }
-}
-```
-
 </TabItem>
 <TabItem value="REST API">
 
@@ -144,14 +78,6 @@ The post payload is in JSON format.
 }
 ```
 
-The post payload includes the following fields:
-
-| Field |  Description | 
-| --- | --- |
-|  `type`  |   The schema type. | 
-|  `schema`  |   The schema definition data, which is encoded in UTF 8 charset. <li>If the schema is a **primitive** schema, this field should be blank. </li><li>If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition. </li> | 
-|  `properties`  |  The additional properties associated with the schema. |
-
 </TabItem>
 <TabItem value="Java Admin API">
 
@@ -159,14 +85,6 @@ The post payload includes the following fields:
 void createSchema(String topic, PostSchemaPayload schemaPayload)
 ```
 
-The `PostSchemaPayload` includes the following fields:
-
-| Field |  Description | 
-| --- | --- |
-|  `type`  |   The schema type. | 
-|  `schema`  |   The schema definition data, which is encoded in UTF 8 charset. <li>If the schema is a **primitive** schema, this field should be blank. </li><li>If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition. </li> | 
-|  `properties`  |  The additional properties associated with the schema. | 
-
 Here is an example of `PostSchemaPayload`:
 
 ```java
@@ -180,11 +98,10 @@ admin.createSchema("my-tenant/my-ns/my-topic", payload);
 ```
 
 </TabItem>
-
 </Tabs>
 ````
 
-### Get a schema (latest)
+### Get the latest schema
 
 To get the latest schema for a topic, you can use one of the following methods. 
 
@@ -233,16 +150,6 @@ Here is an example of a response, which is returned in JSON format.
 }
 ```
 
-The response includes the following fields:
-
-| Field |  Description | 
-| --- | --- |
-|  `version`  |   The schema version, which is a long number. | 
-|  `type`  |   The schema type. | 
-|  `timestamp`  |   The timestamp of creating this version of schema. | 
-|  `data`  |   The schema definition data, which is encoded in UTF 8 charset. <li>If the schema is a **primitive** schema, this field should be blank. </li><li>If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition. </li> | 
-|  `properties`  |  The additional properties associated with the schema. |
-
 </TabItem>
 <TabItem value="Java Admin API">
 
@@ -250,15 +157,6 @@ The response includes the following fields:
 SchemaInfo createSchema(String topic)
 ```
 
-The `SchemaInfo` includes the following fields:
-
-| Field |  Description | 
-| --- | --- |
-|  `name`  |   The schema name. | 
-|  `type`  |   The schema type. | 
-|  `schema`  |   A byte array of the schema definition data, which is encoded in UTF 8 charset. <li>If the schema is a **primitive** schema, this byte array should be empty. </li><li>If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition converted to a byte array. </li> | 
-|  `properties`  |  The additional properties associated with the schema. | 
-
 Here is an example of `SchemaInfo`:
 
 ```java
@@ -268,11 +166,10 @@ SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic");
 ```
 
 </TabItem>
-
 </Tabs>
 ````
 
-### Get a schema (specific)
+### Get a specific schema
 
 To get a specific version of a schema, you can use one of the following methods.
 
@@ -306,16 +203,6 @@ Here is an example of a response, which is returned in JSON format.
 }
 ```
 
-The response includes the following fields:
-
-| Field |  Description | 
-| --- | --- |
-|  `version`  |   The schema version, which is a long number. | 
-|  `type`  |   The schema type. | 
-|  `timestamp`  |   The timestamp of creating this version of schema. | 
-|  `data`  |   The schema definition data, which is encoded in UTF 8 charset. <li>If the schema is a **primitive** schema, this field should be blank. </li><li>If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition. </li> | 
-|  `properties`  |  The additional properties associated with the schema. |
-
 </TabItem>
 <TabItem value="Java Admin API">
 
@@ -323,15 +210,6 @@ The response includes the following fields:
 SchemaInfo createSchema(String topic, long version)
 ```
 
-The `SchemaInfo` includes the following fields:
-
-| Field |  Description | 
-| --- | --- |
-|  `name`  |  The schema name. | 
-|  `type`  |  The schema type. | 
-|  `schema`  |   A byte array of the schema definition data, which is encoded in UTF 8. <li>If the schema is a **primitive** schema, this byte array should be empty. </li><li>If the schema is a **struct** schema, this field should be a JSON string of the Avro schema definition converted to a byte array. </li> | 
-|  `properties`  |  The additional properties associated with the schema. | 
-
 Here is an example of `SchemaInfo`:
 
 ```java
@@ -341,13 +219,12 @@ SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);
 ```
 
 </TabItem>
-
 </Tabs>
 ````
 
 ### Extract a schema
 
-To provide a schema via a topic, you can use the following method.
+To extract (provide) a schema via a topic, use the following method.
 
 ````mdx-code-block
 <Tabs groupId="api-choice"
@@ -363,20 +240,19 @@ pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type <t
 ```
 
 </TabItem>
-
 </Tabs>
 ````
 
 ### Delete a schema
 
-To delete a schema for a topic, you can use one of the following methods.
-
 :::note
 
-In any case, the **delete** action deletes **all versions** of a schema registered for a topic.
+In any case, the `delete` action deletes **all versions** of a schema registered for a topic.
 
 :::
 
+To delete a schema for a topic, you can use one of the following methods.
+
 ````mdx-code-block
 <Tabs groupId="api-choice"
   defaultValue="Admin CLI"
@@ -395,7 +271,7 @@ pulsar-admin schemas delete <topic-name>
 
 Send a `DELETE` request to a schema endpoint: {@inject: endpoint|DELETE|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/deleteSchema?version=@pulsar:version_number@} 
 
-Here is an example of a response, which is returned in JSON format.
+Here is an example of a response returned in JSON format.
 
 ```json
 {
@@ -403,12 +279,6 @@ Here is an example of a response, which is returned in JSON format.
 }
 ```
 
-The response includes the following field:
-
-Field | Description |
----|---|
-`version` | The schema version, which is a long number. |
-
 </TabItem>
 <TabItem value="Java Admin API">
 
@@ -425,26 +295,63 @@ admin.deleteSchema("my-tenant/my-ns/my-topic");
 ```
 
 </TabItem>
-
 </Tabs>
 ````
 
-## Set schema compatibility check strategy 
+## Manage schema AutoUpdate
 
-You can set [schema compatibility check strategy](schema-evolution-compatibility.md#schema-compatibility-check-strategy) at the topic, namespace or broker level. 
+### Enable schema AutoUpdate
 
-The schema compatibility check strategy set at different levels has priority: topic level > namespace level > broker level. 
+To enable/enforce schema auto-update at the namespace level, you can use one of the following methods.
 
-- If you set the strategy at both topic and namespace levels, it uses the topic-level strategy. 
+````mdx-code-block
+<Tabs groupId="api-choice"
+  defaultValue="Admin CLI"
+  values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the `set-is-allow-auto-update-schema` subcommand. 
+
+```bash
+bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `POST` request to a namespace endpoint: {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/isAllowAutoUpdateSchema|operation/isAllowAutoUpdateSchema?version=@pulsar:version_number@}
+
+The post payload is in JSON format.
+
+```json
+{
+“isAllowAutoUpdateSchema”: “true”
+}
+```
+
+</TabItem>
+<TabItem value="Java Admin API">
+
+Here is an example to enable schema auto-update for a tenant/namespace.
+
+```java
+admin.namespaces().setIsAllowAutoUpdateSchema("my-namspace", true);
+```
+
+</TabItem>
+</Tabs>
+````
 
-- If you set the strategy at both namespace and broker levels, it uses the namespace-level strategy.
+### Disable schema AutoUpdate
 
-- If you do not set the strategy at any level, it uses the `FULL` strategy. For all available values, see [here](schema-evolution-compatibility.md#schema-compatibility-check-strategy).
+:::note
 
+When schema auto-update is disabled, you can only [register a new schema](#upload-a-schema).
 
-### Topic level
+:::
 
-To set a schema compatibility check strategy at the topic level, use one of the following methods.
+To disable schema auto-update at the **namespace** level, you can use one of the following commands.
 
 ````mdx-code-block
 <Tabs groupId="api-choice"
@@ -453,38 +360,45 @@ To set a schema compatibility check strategy at the topic level, use one of the
 
 <TabItem value="Admin CLI">
 
-Use the [`pulsar-admin topicPolicies set-schema-compatibility-strategy`](/tools/pulsar-admin/) command. 
+Use the `set-is-allow-auto-update-schema` subcommand. 
 
-```shell
-pulsar-admin topicPolicies set-schema-compatibility-strategy <strategy> <topicName>
+```bash
+bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace
 ```
 
 </TabItem>
 <TabItem value="REST API">
 
-Send a `PUT` request to this endpoint: {@inject: endpoint|PUT|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@}
+Send a `POST` request to a namespace endpoint: {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/isAllowAutoUpdateSchema|operation/isAllowAutoUpdateSchema?version=@pulsar:version_number@}
 
-</TabItem>
-<TabItem value="Java Admin API">
+The post payload is in JSON format.
 
-```java
-void setSchemaCompatibilityStrategy(String topic, SchemaCompatibilityStrategy strategy)
+```json
+{
+“isAllowAutoUpdateSchema”: “false”
+}
 ```
 
-Here is an example of setting a schema compatibility check strategy at the topic level.
+</TabItem>
+<TabItem value="Java Admin API">
 
-```java
-PulsarAdmin admin = …;
+Here is an example to enable schema auto-unpdate of a tenant/namespace.
 
-admin.topicPolicies().setSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
+```java
+admin.namespaces().setIsAllowAutoUpdateSchema("my-namspace", false);
 ```
 
 </TabItem>
-
 </Tabs>
 ````
 
-To get the topic-level schema compatibility check strategy, use one of the following methods.
+## Manage schema validation enforcement
+
+### Enable schema validation enforcement
+
+To enforce schema validation enforcement at the **cluster** level, you can configure `schemaValidationEnforced` to `true` in the `conf/broker.conf` file. 
+
+To enable schema validation enforcement at the **namespace** level, you can use one of the following commands.
 
 ````mdx-code-block
 <Tabs groupId="api-choice"
@@ -493,42 +407,92 @@ To get the topic-level schema compatibility check strategy, use one of the follo
 
 <TabItem value="Admin CLI">
 
-Use the [`pulsar-admin topicPolicies get-schema-compatibility-strategy`](/tools/pulsar-admin/) command. 
+Use the `set-schema-validation-enforce` subcommand. 
 
-```shell
-pulsar-admin topicPolicies get-schema-compatibility-strategy <topicName>
+```bash
+bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace
 ```
 
 </TabItem>
 <TabItem value="REST API">
 
-Send a `GET` request to this endpoint: {@inject: endpoint|GET|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@}
+Send a `POST` request to a namespace endpoint: {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/schemaValidationEnforced|operation/schemaValidationEnforced?version=@pulsar:version_number@}
+
+The post payload is in JSON format.
+
+```json
+{
+“schemaValidationEnforced”: “true”
+}
+```
 
 </TabItem>
 <TabItem value="Java Admin API">
 
+Here is an example to enable schema validation enforcement for a tenant/namespace.
+
 ```java
-SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String topic, boolean applied)
+admin.namespaces().setSchemaValidationEnforced("my-namspace", true);
 ```
 
-Here is an example of getting the topic-level schema compatibility check strategy.
+</TabItem>
+</Tabs>
+````
 
-```java
-PulsarAdmin admin = …;
+### Disable schema validation enforcement
 
-// get the current applied schema compatibility strategy
-admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", true);
+To disable schema validation enforcement at the **namespace** level, you can use one of the following commands.
 
-// only get the schema compatibility strategy from topic policies
-admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", false);
+````mdx-code-block
+<Tabs groupId="api-choice"
+  defaultValue="Admin CLI"
+  values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the `set-schema-validation-enforce` subcommand. 
+
+```bash
+bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `POST` request to a namespace endpoint: {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/schemaValidationEnforced|operation/schemaValidationEnforced?version=@pulsar:version_number@}
+
+The post payload is in JSON format.
+
+```json
+{
+“schemaValidationEnforced”: “false”
+}
 ```
 
 </TabItem>
+<TabItem value="Java Admin API">
+
+Here is an example to enable schema validation enforcement for a tenant/namespace.
 
+```java
+admin.namespaces().setSchemaValidationEnforced("my-namspace", false);
+```
+
+</TabItem>
 </Tabs>
 ````
 
-To remove the topic-level schema compatibility check strategy, use one of the following methods.
+## Manage schema compatibility strategy 
+
+The [schema compatibility check strategy](schema-understand.md#schema-compatibility-check-strategy) configured at different levels has priority: topic level > namespace level > cluster level. In other words:
+  * If you set the strategy at both topic and namespace levels, the topic-level strategy is used. 
+  * If you set the strategy at both namespace and cluster levels, the namespace-level strategy is used. 
+
+### Set schema compatibility strategy
+
+#### Set topic-level schema compatibility strategy
+
+To set a schema compatibility check strategy at the topic level, you can use one of the following methods.
 
 ````mdx-code-block
 <Tabs groupId="api-choice"
@@ -537,40 +501,39 @@ To remove the topic-level schema compatibility check strategy, use one of the fo
 
 <TabItem value="Admin CLI">
 
-Use the [`pulsar-admin topicPolicies remove-schema-compatibility-strategy`](/tools/pulsar-admin/) command. 
+Use the [`pulsar-admin topicPolicies set-schema-compatibility-strategy`](/tools/pulsar-admin/) command. 
 
 ```shell
-pulsar-admin topicPolicies remove-schema-compatibility-strategy <topicName>
+pulsar-admin topicPolicies set-schema-compatibility-strategy <strategy> <topicName>
 ```
 
 </TabItem>
 <TabItem value="REST API">
 
-Send a `DELETE` request to this endpoint: {@inject: endpoint|DELETE|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@}
+Send a `PUT` request to this endpoint: {@inject: endpoint|PUT|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@}
 
 </TabItem>
 <TabItem value="Java Admin API">
 
 ```java
-void removeSchemaCompatibilityStrategy(String topic)
+void setSchemaCompatibilityStrategy(String topic, SchemaCompatibilityStrategy strategy)
 ```
 
-Here is an example of removing the topic-level schema compatibility check strategy.
+Here is an example of setting a schema compatibility check strategy at the topic level.
 
 ```java
 PulsarAdmin admin = …;
 
-admin.removeSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic");
+admin.topicPolicies().setSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
 ```
 
 </TabItem>
-
 </Tabs>
 ````
 
-### Namespace level
+#### Set namespace-level schema compatibility strategy
 
-You can set schema compatibility check strategy at namespace level using one of the following methods.
+To set schema compatibility check strategy at the namespace level, you can use one of the following methods.
 
 ````mdx-code-block
 <Tabs groupId="api-choice"
@@ -600,14 +563,97 @@ admin.namespaces().setSchemaCompatibilityStrategy("test", SchemaCompatibilityStr
 ```
 
 </TabItem>
-
 </Tabs>
 ````
 
-### Broker level
+#### Set cluster-level schema compatibility strategy
 
-You can set schema compatibility check strategy at broker level by setting `schemaCompatibilityStrategy` in `conf/broker.conf` or `conf/standalone.conf` file.
+To set schema compatibility check strategy at the **cluster** level, set `schemaCompatibilityStrategy` in the `conf/broker.conf` file.
+
+The following is an example:
 
 ```conf
 schemaCompatibilityStrategy=ALWAYS_INCOMPATIBLE
-```
\ No newline at end of file
+```
+
+### Get schema compatibility strategy
+
+#### Get topic-level schema compatibility strategy
+
+To get the topic-level schema compatibility check strategy, you can use one of the following methods.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+  defaultValue="Admin CLI"
+  values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the [`pulsar-admin topicPolicies get-schema-compatibility-strategy`](/tools/pulsar-admin/) command. 
+
+```shell
+pulsar-admin topicPolicies get-schema-compatibility-strategy <topicName>
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `GET` request to this endpoint: {@inject: endpoint|GET|/admin/v2/topics/:tenant/:namespace/:topic|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@}
+
+</TabItem>
+<TabItem value="Java Admin API">
+
+```java
+SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String topic, boolean applied)
+```
+
+Here is an example of getting the topic-level schema compatibility check strategy.
+
+```java
+PulsarAdmin admin = …;
+
+// get the current applied schema compatibility strategy
+admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", true);
+
+// only get the schema compatibility strategy from topic policies
+admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", false);
+```
+
+</TabItem>
+</Tabs>
+````
+
+#### Get namespace-level schema compatibility strategy
+
+You can get schema compatibility check strategy at namespace level using one of the following methods.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+  defaultValue="Admin CLI"
+  values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the [`pulsar-admin namespaces get-schema-compatibility-strategy`](/tools/pulsar-admin/) command. 
+
+```shell
+pulsar-admin namespaces get-schema-compatibility-strategy options
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `GET` request to this endpoint: {@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace|operation/schemaCompatibilityStrategy?version=@pulsar:version_number@}
+
+</TabItem>
+<TabItem value="Java Admin API">
+
+Use the [`getSchemaCompatibilityStrategy`](/api/admin/) method.
+
+```java
+admin.namespaces().getSchemaCompatibilityStrategy("test", SchemaCompatibilityStrategy.FULL);
+```
+
+</TabItem>
+</Tabs>
+````
\ No newline at end of file
diff --git a/site2/website-next/docs/client-libraries-python.md b/site2/website-next/docs/client-libraries-python.md
index 3c17c2e519c..8642ad49df1 100644
--- a/site2/website-next/docs/client-libraries-python.md
+++ b/site2/website-next/docs/client-libraries-python.md
@@ -295,8 +295,6 @@ The schema definition is like this.
 
 ### Declare and validate schema
 
-You can send messages using `BytesSchema`, `StringSchema`, `AvroSchema`, and `JsonSchema`.
-
 Before the producer is created, the Pulsar broker validates that the existing topic schema is the correct type and that the format is compatible with the schema definition of a class. If the format of the topic schema is incompatible with the schema definition, an exception occurs in the producer creation.
 
 Once a producer is created with a certain schema definition, it only accepts objects that are instances of the declared schema class.
@@ -323,176 +321,7 @@ while True:
         consumer.negative_acknowledge(msg)
 ```
 
-````mdx-code-block
-<Tabs
-  defaultValue="BytesSchema"
-  values={[{"label":"BytesSchema","value":"BytesSchema"},{"label":"StringSchema","value":"StringSchema"},{"label":"AvroSchema","value":"AvroSchema"},{"label":"JsonSchema","value":"JsonSchema"}]}>
-
-<TabItem value="BytesSchema">
-
-You can send byte data using a `BytesSchema`.
-
-**Example**
-
-```python
-producer = client.create_producer(
-                'bytes-schema-topic',
-                schema=BytesSchema())
-producer.send(b"Hello")
-
-consumer = client.subscribe(
-				'bytes-schema-topic',
-				'sub',
-				schema=BytesSchema())
-msg = consumer.receive()
-data = msg.value()
-```
-
-</TabItem>
-<TabItem value="StringSchema">
-
-You can send string data using a `StringSchema`.
-
-**Example**
-
-```python
-producer = client.create_producer(
-                'string-schema-topic',
-                schema=StringSchema())
-producer.send("Hello")
-
-consumer = client.subscribe(
-				'string-schema-topic',
-				'sub',
-				schema=StringSchema())
-msg = consumer.receive()
-str = msg.value()
-```
-
-</TabItem>
-<TabItem value="AvroSchema">
-
-You can declare an `AvroSchema` using one of the following methods.
-
-#### Method 1: Record
-
-You can declare an `AvroSchema` by passing a class that inherits
-from `pulsar.schema.Record` and defines the fields as
-class variables.
-
-**Example**
-
-```python
-class Example(Record):
-    a = Integer()
-    b = Integer()
-
-producer = client.create_producer(
-                'avro-schema-topic',
-                schema=AvroSchema(Example))
-r = Example(a=1, b=2)
-producer.send(r)
-
-consumer = client.subscribe(
-				'avro-schema-topic',
-				'sub',
-				schema=AvroSchema(Example))
-msg = consumer.receive()
-e = msg.value()
-```
-
-#### Method 2: JSON definition
-
-You can declare an `AvroSchema` using JSON. In this case, Avro schemas are defined using JSON.
-
-**Example**
-
-Below is an `AvroSchema` defined using a JSON file (_company.avsc_).
-
-```json
-{
-    "doc": "this is doc",
-    "namespace": "example.avro",
-    "type": "record",
-    "name": "Company",
-    "fields": [
-        {"name": "name", "type": ["null", "string"]},
-        {"name": "address", "type": ["null", "string"]},
-        {"name": "employees", "type": ["null", {"type": "array", "items": {
-            "type": "record",
-            "name": "Employee",
-            "fields": [
-                {"name": "name", "type": ["null", "string"]},
-                {"name": "age", "type": ["null", "int"]}
-            ]
-        }}]},
-        {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}
-    ]
-}
-```
-
-You can load a schema definition from file by using [`avro.schema`](https://avro.apache.org/docs/current/getting-started-python/) or [`fastavro.schema`](https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.load_schema).
-
-If you use the "JSON definition" method to declare an `AvroSchema`, pay attention to the following points:
-
-- You need to use [Python dict](https://developers.google.com/edu/python/dict-files) to produce and consume messages, which is different from using the "Record" method.
-
-- When generating an `AvroSchema` object, set `_record_cls` parameter to `None`.
-
-**Example**
-
-```python
-from fastavro.schema import load_schema
-from pulsar.schema import *
-schema_definition = load_schema("examples/company.avsc")
-avro_schema = AvroSchema(None, schema_definition=schema_definition)
-producer = client.create_producer(
-    topic=topic,
-    schema=avro_schema)
-consumer = client.subscribe(topic, 'test', schema=avro_schema)
-company = {
-    "name": "company-name" + str(i),
-    "address": 'xxx road xxx street ' + str(i),
-    "employees": [
-        {"name": "user" + str(i), "age": 20 + i},
-        {"name": "user" + str(i), "age": 30 + i},
-        {"name": "user" + str(i), "age": 35 + i},
-    ],
-    "labels": {
-        "industry": "software" + str(i),
-        "scale": ">100",
-        "funds": "1000000.0"
-    }
-}
-producer.send(company)
-msg = consumer.receive()
-# Users could get a dict object by `value()` method.
-msg.value()
-```
-
-</TabItem>
-<TabItem value="JsonSchema">
-
-#### Record
-
-You can declare a `JsonSchema` by passing a class that inherits
-from `pulsar.schema.Record` and defines the fields as class variables. This is similar to using `AvroSchema`. The only difference is to use  `JsonSchema` instead of `AvroSchema` when defining schema type as shown below. For how to use `AvroSchema` via record, see [heres-python.md#method-1-record).
-
-```python
-producer = client.create_producer(
-                'avro-schema-topic',
-                schema=JsonSchema(Example))
-
-consumer = client.subscribe(
-				'avro-schema-topic',
-				'sub',
-				schema=JsonSchema(Example))
-```
-
-</TabItem>
-
-</Tabs>
-````
+For more code examples, see [Schema - Get started](schema-get-started.md).
 
 ## End-to-end encryption
 
diff --git a/site2/website-next/docs/schema-evolution-compatibility.md b/site2/website-next/docs/schema-evolution-compatibility.md
index 438eea638bf..a8a32d3d8f6 100644
--- a/site2/website-next/docs/schema-evolution-compatibility.md
+++ b/site2/website-next/docs/schema-evolution-compatibility.md
@@ -4,167 +4,9 @@ title: Schema evolution and compatibility
 sidebar_label: "Schema evolution and compatibility"
 ---
 
-Normally, schemas do not stay the same over a long period of time. Instead, they undergo evolutions to satisfy new needs. 
+````mdx-code-block
+import useLocationParentPath from '@site/src/libs/useLocationParentPath';
+import {Redirect} from "@docusaurus/router";
 
-This chapter introduces how Pulsar schema evolves and what compatibility check strategies it adopts.
-
-## Schema evolution
-
-The message produced with `SchemaInfo` is tagged with a schema version. When a message is consumed by a Pulsar client, the Pulsar client can use the schema version to retrieve the corresponding `SchemaInfo` and use the correct schema information to deserialize data.
-
-Schemas store the details of attributes and types. To satisfy new business requirements,  you need to update schemas inevitably over time, which is called **schema evolution**. 
-
-Any schema changes affect downstream consumers. Schema evolution ensures that the downstream consumers can seamlessly handle data encoded with both old schemas and new schemas. 
-
-### How schema evolves? 
-
-The answer is [schema compatibility check strategy](#schema-compatibility-check-strategy). It determines how schema compares old schemas with new schemas in topics.
-
-### How Pulsar supports schema evolution?
-
-The process of how Pulsar supports schema evolution is described as follows.
-
-1. The producer/consumer/reader sends the `SchemaInfo` of its client to brokers. 
-   
-2. Brokers recognize the schema type and deploy the schema compatibility checker `schemaRegistryCompatibilityCheckers` for that schema type to enforce the schema compatibility check. By default, the value of `schemaRegistryCompatibilityCheckers` in the `conf/broker.conf` or `conf/standalone.conf` file is as follows.
-   
-   ```properties
-   schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck
-   ```
-
-   :::note
-
-   Each schema type corresponds to one instance of schema compatibility checker. Currently, Avro, JSON, and Protobuf have their own compatibility checkers, while all the other schema types share the default compatibility checker which disables the schema evolution. In a word, schema evolution is only available in Avro, JSON, and Protobuf schema.
-
-   :::
-
-3. Brokers use the schema compatibility checker to check if the `SchemaInfo` is compatible with the latest schema of the topic by applying its [compatibility check strategy](#schema-compatibility-check-strategy). Currently, the compatibility check strategy is configured at the namespace level and applied to all the topics within that namespace.
-
-For more details, see [`schemaRegistryCompatibilityCheckers`](https://github.com/apache/pulsar/blob/bf194b557c48e2d3246e44f1fc28876932d8ecb8/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java).
-
-
-## Schema compatibility check strategy
-
-The following table outlines 8 schema compatibility check strategies and how it works.
-
-Suppose that you have a topic containing three schemas (V1, V2, and V3). V1 is the oldest and V3 is the latest.
-
-|  Compatibility check strategy  |   Definition  |   Changes allowed  |   Check against which schema  |   Upgrade first  | 
-| --- | --- | --- | --- | --- |
-|  `ALWAYS_COMPATIBLE`  |   Disable schema compatibility check.  |   All changes are allowed  |   All previous versions  |   Any order  | 
-|  `ALWAYS_INCOMPATIBLE`  |   Disable schema evolution, that is, any schema change is rejected.  |   All changes are disabled  |   None  |   None  | 
-|  `BACKWARD`  |   Consumers using schema V3 can process data written by producers using the **last schema version** V2.  |   <li>Add optional fields </li><li>Delete fields </li> |   Latest version  |   Consumers  | 
-|  `BACKWARD_TRANSITIVE`  |   Consumers using schema V3 can process data written by producers using **all previous schema versions** V2 and V1.  |   <li>Add optional fields </li><li>Delete fields </li> |   All previous versions  |   Consumers  | 
-|  `FORWARD`  |   Consumers using the **last schema version** V2 can process data written by producers using a new schema V3, even though they may not be able to use the full capabilities of the new schema.  |   <li>Add fields </li><li>Delete optional fields </li> |   Latest version  |   Producers  | 
-|  `FORWARD_TRANSITIVE`  |   Consumers using **all previous schema versions** V2 or V1 can process data written by producers using a new schema V3.  |   <li>Add fields </li><li>Delete optional fields </li> |   All previous versions  |   Producers  | 
-|  `FULL`  |   Schemas are both backward and forward compatible. <li>Consumers using the last schema V2 can process data written by producers using the new schema V3. </li><li>Consumers using the new schema V3 can process data written by producers using the last schema V2.</li>  |   <li>Modify optional fields </li> |   Latest version  |   Any order  | 
-|  `FULL_TRANSITIVE`  |   Backward and forward compatible among schema V3, V2, and V1. <li>Consumers using the schema V3 can process data written by producers using schema V2 and V1. </li><li>Consumers using the schema V2 or V1 can process data written by producers using the schema V3.</li>  |   <li>Modify optional fields </li> |   All previous versions  |   Any order  | 
-
-:::tip
-
-* The default schema compatibility check strategy varies depending on schema types.
-  * For Avro and JSON, the default one is `FULL`.
-  * For others, the default one is `ALWAYS_INCOMPATIBLE`.
-* You can set schema compatibility check strategy at the topic, namespace or broker level. For how to set the strategy, see [here](admin-api-schemas.md#set-schema-compatibility-check-strategy).
-
-:::
-
-### ALWAYS_COMPATIBLE example
-  
-  In some situations, an application needs to store events of several different types in the same Pulsar topic. 
-
-  In particular, when developing a data model in an `Event Sourcing` style, you might have several kinds of events that affect the state of an entity. 
-
-  For example, for a user entity, there are `userCreated`, `userAddressChanged` and `userEnquiryReceived` events. The application requires that those events are always read in the same order. 
-
-  Consequently, those events need to go in the same Pulsar partition to maintain order. This application can use `ALWAYS_COMPATIBLE` to allow different kinds of events to co-exist on the same topic.
-
-### ALWAYS_INCOMPATIBLE example
-
-  Sometimes we also make incompatible changes. For example, you are modifying a field type from `string` to `int`.
-
-  In this case, you need to:
-
-  * Upgrade all producers and consumers to the new schema versions at the same time.
-
-  * Optionally, create a new topic and start migrating applications to use the new topic and the new schema, avoiding the need to handle two incompatible versions in the same topic.
-
-### BACKWARD and BACKWARD_TRANSITIVE example
-  
-* Example 1
-  
-  Remove a field.
-  
-  A consumer constructed to process events without one field can process events written with the old schema containing the field, and the consumer will ignore that field.
-
-* Example 2
-  
-  You want to load all Pulsar data into a Hive data warehouse and run SQL queries against the data. 
-
-  Same SQL queries must continue to work even if the data is changed. To support it, you can evolve the schemas using the `BACKWARD` strategy.
-
-### FORWARD and FORWARD_TRANSITIVE example
-
-* Example 1
-  
-  Add a field.
-  
-  In most data formats, consumers written to process events without new fields can continue doing so even when they receive new events containing new fields.
-
-* Example 2
-  
-  If a consumer has an application logic tied to a full version of a schema, the application logic may not be updated instantly when the schema evolves.
-  
-  In this case, you need to project data with a new schema onto an old schema that the application understands. 
-  
-  Consequently, you can evolve the schemas using the `FORWARD` strategy to ensure that the old schema can process data encoded with the new schema.
-
-### FULL and FULL_TRANSITIVE example
-
-In some data formats, for example, Avro, you can define fields with default values. Consequently, adding or removing a field with a default value is a fully compatible change.
-
-
-## Schema validation
-
-When a producer or a consumer tries to connect to a topic, a broker performs some checks to verify a schema.
-
-### Validation on producers
-
-By default, `schemaValidationEnforced` is **disabled** for producers, which means:
-* A producer without a schema can produce any kind of messages to a topic with schemas, which may result in producing trash data to the topic. 
-* It allows non-java language clients that don’t support schema can produce messages to a topic with schemas.
-
-However, if you want a stronger guarantee on the topics with schemas, you can enable `schemaValidationEnforced` across the whole cluster or on a per-namespace basis. 
-
-With `schemaValidationEnforced` enabled, When a producer tries to connect to a topic (suppose ignore the schema auto-creation), the broker checks if the schema carried by the producer exists in the schema registry or not.
-  * If the schema is already registered, then the producer is connected to a broker and produces messages with that schema.
-  * If the schema is not registered, then Pulsar verifies if the schema is allowed to be registered based on the configured compatibility check strategy.
-  
-### Validation on consumers
-
-When a consumer tries to connect to a topic, a broker checks if a carried schema is compatible with a registered schema based on the configured schema compatibility check strategy.
-
-| Compatibility check strategy | Check logic              |
-|------------------------------|--------------------------|
-| `ALWAYS_COMPATIBLE`          | All pass                 |
-| `ALWAYS_INCOMPATIBLE`        | No pass                  |
-| `BACKWARD`                   | Can read the last schema |
-| `BACKWARD_TRANSITIVE`        | Can read all schemas     |
-| `FORWARD`                    | Can read the last schema |
-| `FORWARD_TRANSITIVE`         | Can read the last schema |
-| `FULL`                       | Can read the last schema |
-| `FULL_TRANSITIVE`            | Can read all schemas     |
-
-## Order of upgrading clients
-
-The order of upgrading client applications is determined by the [schema compatibility check strategy](#schema-compatibility-check-strategy).
-
-For example, the producers use schemas to write data to Pulsar and the consumers use schemas to read data from Pulsar. 
-
-|  Compatibility check strategy  |   Upgrade first  | Description                                                                                                                                                                                                                                                                                                         | 
-| --- | --- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-|  `ALWAYS_COMPATIBLE`  |   Any order  | The compatibility check is disabled. Consequently, you can upgrade the producers and consumers in **any order**.                                                                                                                                                                                                    | 
-|  `ALWAYS_INCOMPATIBLE`  |   None  | The schema evolution is disabled.                                                                                                                                                                                                                                                                                   | 
-|  <li>`BACKWARD` </li><li>`BACKWARD_TRANSITIVE` </li> |   Consumers  | There is no guarantee that consumers using the old schema can read data produced using the new schema. Consequently, **upgrade all consumers first**, and then start producing new data.                                                                                                                            | 
-|  <li>`FORWARD` </li><li>`FORWARD_TRANSITIVE` </li> |   Producers  | There is no guarantee that consumers using the new schema can read data produced using the old schema. Consequently, **upgrade all producers first**<li>to use the new schema and ensure that the data already produced using the old schemas are not available to consumers, and then upgrades the consumers. </li> | 
-|  <li>`FULL` </li><li>`FULL_TRANSITIVE` </li> |   Any order  | It is guaranteed that consumers using the old schema can read data produced using the new schema and consumers using the new schema can read data produced using the old schema. Consequently, you can upgrade the producers and consumers in **any order**.                                                        | 
\ No newline at end of file
+<Redirect to={useLocationParentPath() + "schema-understand#schema-evolution"} />
+````
\ No newline at end of file
diff --git a/site2/website-next/docs/schema-get-started.md b/site2/website-next/docs/schema-get-started.md
index 5b6cea677e3..c8aa3a1ce1a 100644
--- a/site2/website-next/docs/schema-get-started.md
+++ b/site2/website-next/docs/schema-get-started.md
@@ -11,55 +11,203 @@ import TabItem from '@theme/TabItem';
 ````
 
 
-This hands-on tutorial provides instructions and examples on how to construct and customize schemas.
+This hands-on tutorial provides instructions and examples on how to construct schemas. For instructions on administrative tasks, see [Manage schema](admin-api-schemas.md).
 
-## Construct a string schema
+## Construct a schema
 
-This example demonstrates how to construct a [string schema](schema-understand.md#primitive-type) and use it to produce and consume messages in Java.
+### bytes
 
-1. Create a producer with a string schema and send messages.
+This example demonstrates how to construct a [bytes schema](schema-understand.md#primitive-type) using language-specific clients and use it to produce and consume messages.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+  defaultValue="Java"
+  values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Python","value":"Python"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
+
+```java
+Producer<byte[]> producer = PulsarClient.newProducer(Schema.BYTES)
+       .topic("my-topic")
+       .create();
+Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+       .topic("my-topic")
+       .subscriptionName("my-sub")
+       .subscribe();
+
+producer.newMessage().value("message".getBytes()).send();
+
+Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+```
+
+</TabItem>
+<TabItem value="C++">
+
+```cpp
+SchemaInfo schemaInfo = SchemaInfo(SchemaType::BYTES, "Bytes", "");
+Producer producer;
+client.createProducer("topic-bytes", ProducerConfiguration().setSchema(schemaInfo), producer);
+std::array<char, 1024> buffer;
+producer.send(MessageBuilder().setContent(buffer.data(), buffer.size()).build());
+Consumer consumer;
+res = client.subscribe("topic-bytes", "my-sub", ConsumerConfiguration().setSchema(schemaInfo), consumer);
+Message msg;
+consumer.receive(msg, 3000);
+```
+
+</TabItem>
+<TabItem value="Python">
+
+```python
+producer = client.create_producer(
+   'bytes-schema-topic',
+   schema=BytesSchema())
+producer.send(b"Hello")
+
+consumer = client.subscribe(
+   'bytes-schema-topic',
+	'sub',
+	schema=BytesSchema())
+msg = consumer.receive()
+data = msg.value()
+```
+
+</TabItem>
+<TabItem value="Go">
+
+```go
+producer, err := client.CreateProducer(pulsar.ProducerOptions{
+    Topic:  "my-topic",
+    Schema: pulsar.NewBytesSchema(nil),
+})
+id, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
+    Value: []byte("message"),
+})
+
+consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+    Topic:            "my-topic",
+    Schema:           pulsar.NewBytesSchema(nil),
+    SubscriptionName: "my-sub",
+    Type:             pulsar.Exclusive,
+})
+```
+
+</TabItem>
+</Tabs>
+````
+
+### string
+
+This example demonstrates how to construct a [string schema](schema-understand.md#primitive-type) using language-specific clients and use it to produce and consume messages.
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+  defaultValue="Java"
+  values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Python","value":"Python"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
 
    ```java
    Producer<String> producer = client.newProducer(Schema.STRING).create();
    producer.newMessage().value("Hello Pulsar!").send();
-   ```
 
-2. Create a consumer with a string schema and receive messages.  
-
-   ```java
    Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
-   consumer.receive();
+   Message<String> message = consumer.receive();
    ```
 
-## Construct a key/value schema
+</TabItem>
+<TabItem value="C++">
+
+```cpp
+SchemaInfo schemaInfo = SchemaInfo(SchemaType::STRING, "String", "");
+Producer producer;
+client.createProducer("topic-string", ProducerConfiguration().setSchema(schemaInfo), producer);
+producer.send(MessageBuilder().setContent("message").build());
+
+Consumer consumer;
+client.subscribe("topic-string", "my-sub", ConsumerConfiguration().setSchema(schemaInfo), consumer);
+Message msg;
+consumer.receive(msg, 3000);
+```
+
+</TabItem>
+<TabItem value="Python">
+
+```python
+producer = client.create_producer(
+      'string-schema-topic',
+      schema=StringSchema())
+producer.send("Hello")
+
+consumer = client.subscribe(
+		'string-schema-topic',
+		'sub',
+		schema=StringSchema())
+msg = consumer.receive()
+str = msg.value()
+```
+
+</TabItem>
+<TabItem value="Go">
+
+```go
+producer, err := client.CreateProducer(pulsar.ProducerOptions{
+    Topic:  "my-topic",
+    Schema: pulsar.NewStringSchema(nil),
+})
+id, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
+    Value: "message",
+})
+
+consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+    Topic:            "my-topic",
+    Schema:           pulsar.NewStringSchema(nil),
+    SubscriptionName: "my-sub",
+    Type:             pulsar.Exclusive,
+})
+msg, err := consumer.Receive(context.Background())
+```
+
+</TabItem>
+</Tabs>
+````
+
+### key/value
+
+This example shows how to construct a [key/value schema](schema-understand.md#keyvalue-schema) using language-specific clients and use it to produce and consume messages.
 
-This example shows how to construct a [key/value schema](schema-understand.md#keyvalue-schema) and use it to produce and consume messages in Java.
+````mdx-code-block
+<Tabs groupId="api-choice"
+  defaultValue="Java"
+  values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"}]}>
+
+<TabItem value="Java">
 
 1. Construct a key/value schema with `INLINE` encoding type.
 
-   ```java
-   Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
-   Schema.INT32,
-   Schema.STRING,
-   KeyValueEncodingType.INLINE
-   );
-   ```
+    ```java
+    Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
+        Schema.INT32,
+        Schema.STRING,
+        KeyValueEncodingType.INLINE
+    );
+    ```
 
-2. Optionally, construct a key/value schema with `SEPARATED` encoding type.
+   Alternatively, construct a key/value schema with `SEPARATED` encoding type.
 
    ```java
    Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
-   Schema.INT32,
-   Schema.STRING,
-   KeyValueEncodingType.SEPARATED
+       Schema.INT32,
+       Schema.STRING,
+       KeyValueEncodingType.SEPARATED
    );
    ```
 
-3. Produce messages using a key/value schema.
+2. Produce messages using a key/value schema.
 
    ```java
    Producer<KeyValue<Integer, String>> producer = client.newProducer(kvSchema)
-       .topic(TOPIC)
+       .topic(topicName)
        .create();
 
    final int key = 100;
@@ -67,134 +215,76 @@ This example shows how to construct a [key/value schema](schema-understand.md#ke
 
    // send the key/value message
    producer.newMessage()
-   .value(new KeyValue(key, value))
-   .send();
+       .value(new KeyValue(key, value))
+       .send();
    ```
 
-4. Consume messages using a key/value schema.
+3. Consume messages using a key/value schema.
 
    ```java
    Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(kvSchema)
        ...
-       .topic(TOPIC)
-       .subscriptionName(SubscriptionName).subscribe();
+       .topic(topicName)
+       .subscriptionName(subscriptionName).subscribe();
 
    // receive key/value pair
    Message<KeyValue<Integer, String>> msg = consumer.receive();
    KeyValue<Integer, String> kv = msg.getValue();
    ```
 
-## Construct a struct schema
-
-This example shows how to construct a [struct schema](schema-understand.md#struct-schema) and use it to produce and consume messages using different methods.
-
-````mdx-code-block
-<Tabs 
-  defaultValue="static"
-  values={[{"label":"static","value":"static"},{"label":"generic","value":"generic"},{"label":"SchemaDefinition","value":"SchemaDefinition"}]}>
-
-<TabItem value="static">
-
-You can predefine the `struct` schema, which can be a POJO in Java, a `struct` in Go, or classes generated by Avro or Protobuf tools. 
-
-**Example** 
-
-Pulsar gets the schema definition from the predefined `struct` using an Avro library. The schema definition is the schema data stored as a part of the `SchemaInfo`.
-
-1. Create the _User_ class to define the messages sent to Pulsar topics.
-
-   ```java
-   @Builder
-   @AllArgsConstructor
-   @NoArgsConstructor
-   public static class User {
-       String name;
-       int age;
-   }
-   ```
-
-2. Create a producer with a `struct` schema and send messages.
-
-   ```java
-   Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
-   producer.newMessage().value(User.builder().name("pulsar-user").age(1).build()).send();
-   ```
-
-3. Create a consumer with a `struct` schema and receive messages
-
-   ```java
-   Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe();
-   User user = consumer.receive().getValue();
-   ```
-
 </TabItem>
-<TabItem value="generic">
-
-Sometimes applications do not have pre-defined structs, and you can use this method to define schema and access data.
-
-You can define the `struct` schema using the `GenericSchemaBuilder`, generate a generic struct using `GenericRecordBuilder` and consume messages into `GenericRecord`.
-
-**Example** 
+<TabItem value="C++">
 
-1. Use `RecordSchemaBuilder` to build a schema.
-
-   ```java
-   RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName");
-   recordSchemaBuilder.field("intField").type(SchemaType.INT32);
-   SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO);
-
-   Producer<GenericRecord> producer = client.newProducer(Schema.generic(schemaInfo)).create();
-   ```
-
-2. Use `RecordBuilder` to build the struct records.
+1. Construct a key/value schema with `INLINE` encoding type.
 
-   ```java
-   producer.newMessage().value(schema.newRecordBuilder()
-               .set("intField", 32)
-               .build()).send();
+   ```cpp
+   //Prepare keyValue schema
+   std::string jsonSchema =
+   R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+   SchemaInfo keySchema(JSON, "key-json", jsonSchema);
+   SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
+   SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE);
    ```
 
-</TabItem>
-<TabItem value="SchemaDefinition">
-
-You can define the `schemaDefinition` to generate a `struct` schema.
+2. Produce messages using a key/value schema.
 
-**Example** 
-
-1. Create the _User_ class to define the messages sent to Pulsar topics.
-
-   ```java
-   @Builder
-   @AllArgsConstructor
-   @NoArgsConstructor
-   public static class User {
-       String name;
-       int age;
-   }
+   ```cpp
+   //Create Producer
+   Producer producer;
+   client.createProducer("my-topic", ProducerConfiguration().setSchema(keyValueSchema), producer);
+
+   //Prepare message
+   std::string jsonData = "{\"re\":2.1,\"im\":1.23}";
+   KeyValue keyValue(std::move(jsonData), std::move(jsonData));
+   Message msg = MessageBuilder().setContent(keyValue).setProperty("x", "1").build();
+   //Send message
+   producer.send(msg);
    ```
 
-2. Create a producer with a `SchemaDefinition` and send messages.
-
-   ```java
-   SchemaDefinition<User> schemaDefinition = SchemaDefinition.<User>builder().withPojo(User.class).build();
-   Producer<User> producer = client.newProducer(Schema.AVRO(schemaDefinition)).create();
-   producer.newMessage().value(User.builder().name("pulsar-user").age(1).build()).send();
-   ```
+3. Consume messages using a key/value schema.
 
-3. Create a consumer with a `SchemaDefinition` schema and receive messages
+   ```cpp
+   //Create Consumer
+   Consumer consumer;
+   client.subscribe("my-topic", "my-sub", ConsumerConfiguration().setSchema(keyValueSchema), consumer);
 
-   ```java
-   SchemaDefinition<User> schemaDefinition = SchemaDefinition.<User>builder().withPojo(User.class).build();
-   Consumer<User> consumer = client.newConsumer(Schema.AVRO(schemaDefinition)).subscribe();
-   User user = consumer.receive().getValue();
+   //Receive message
+   Message message;
+   consumer.receive(message);
    ```
 
 </TabItem>
-
 </Tabs>
 ````
 
-### Avro schema using Java
+### Avro
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+  defaultValue="Java"
+  values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Python","value":"Python"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
 
 Suppose you have a `SensorReading` class as follows, and you'd like to transmit it over a Pulsar topic.
 
@@ -228,88 +318,350 @@ Producer<SensorReading> producer = client.newProducer(AvroSchema.of(SensorReadin
         .create();
 ```
 
-### Avro-based schema using Java
-
-The following schema formats are currently available for Java:
+</TabItem>
+<TabItem value="C++">
 
-* No schema or the byte array schema (which can be applied using `Schema.BYTES`):
+  ```cpp
+  // Send messages
+  static const std::string exampleSchema =
+      "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+      "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+  Producer producer;
+  ProducerConfiguration producerConf;
+  producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+  client.createProducer("topic-avro", producerConf, producer);
 
-  ```java
-  Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES)
-      .topic("some-raw-bytes-topic")
-      .create();
+  // Receive messages
+  static const std::string exampleSchema =
+      "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
+      "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
+  ConsumerConfiguration consumerConf;
+  Consumer consumer;
+  consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
+  client.subscribe("topic-avro", "sub-2", consumerConf, consumer)
   ```
 
-  Or, equivalently:
+</TabItem>
+<TabItem value="Python">
 
-  ```java
-  Producer<byte[]> bytesProducer = client.newProducer()
-      .topic("some-raw-bytes-topic")
-      .create();
-  ```
+You can declare an `AvroSchema` using Python through one of the following methods.
 
-* `String` for normal UTF-8-encoded string data. Apply the schema using `Schema.STRING`:
+**Method 1: Record**
 
-  ```java
-  Producer<String> stringProducer = client.newProducer(Schema.STRING)
-      .topic("some-string-topic")
-      .create();
-  ```
+Declare an `AvroSchema` by passing a class that inherits from `pulsar.schema.Record` and defines the fields as class variables.
 
-* Create JSON schemas for POJOs using `Schema.JSON`. The following is an example.
+```python
+class Example(Record):
+    a = Integer()
+    b = Integer()
 
-  ```java
-  Producer<MyPojo> pojoProducer = client.newProducer(Schema.JSON(MyPojo.class))
-      .topic("some-pojo-topic")
-      .create();
-  ```
+producer = client.create_producer(
+   'avro-schema-topic',
+   schema=AvroSchema(Example))
+r = Example(a=1, b=2)
+producer.send(r)
 
-* Generate Protobuf schemas using `Schema.PROTOBUF`. The following example shows how to create the Protobuf schema and use it to instantiate a new producer:
+consumer = client.subscribe(
+   'avro-schema-topic',
+	'sub',
+	schema=AvroSchema(Example))
+msg = consumer.receive()
+e = msg.value()
+```
 
-  ```java
-  Producer<MyProtobuf> protobufProducer = client.newProducer(Schema.PROTOBUF(MyProtobuf.class))
-      .topic("some-protobuf-topic")
-      .create();
-  ```
+**Method 2: JSON definition**
+
+1. Declare an `AvroSchema` using JSON. In this case, Avro schemas are defined using JSON.
+
+   Below is an example of `AvroSchema` defined using a JSON file (`company.avsc`).
+
+   ```json
+   {
+       "doc": "this is doc",
+       "namespace": "example.avro",
+       "type": "record",
+       "name": "Company",
+       "fields": [
+           {"name": "name", "type": ["null", "string"]},
+           {"name": "address", "type": ["null", "string"]},
+           {"name": "employees", "type": ["null", {"type": "array", "items": {
+               "type": "record",
+               "name": "Employee",
+               "fields": [
+                   {"name": "name", "type": ["null", "string"]},
+                   {"name": "age", "type": ["null", "int"]}
+               ]
+           }}]},
+           {"name": "labels", "type": ["null", {"type": "map", "values": "string"}]}
+       ]
+   }
+   ```
 
-* Define Avro schemas with `Schema.AVRO`. The following code snippet demonstrates how to create and use Avro schema.
+2. Load a schema definition from a file by using [`avro.schema`](https://avro.apache.org/docs/current/getting-started-python/) or [`fastavro.schema`](https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.load_schema).
+
+   If you use the [JSON definition](#method-2-json-definition) method to declare an `AvroSchema`, you need to:
+   - Use [Python dict](https://developers.google.com/edu/python/dict-files) to produce and consume messages, which is different from using the [Record](#method-1-record) method.
+   - Set the value of the `_record_cls` parameter to `None` when generating an `AvroSchema` object.
+
+   **Example**
+
+   ```python
+   from fastavro.schema import load_schema
+   from pulsar.schema import *
+   schema_definition = load_schema("examples/company.avsc")
+   avro_schema = AvroSchema(None, schema_definition=schema_definition)
+   producer = client.create_producer(
+       topic=topic,
+       schema=avro_schema)
+   consumer = client.subscribe(topic, 'test', schema=avro_schema)
+   company = {
+       "name": "company-name" + str(i),
+       "address": 'xxx road xxx street ' + str(i),
+       "employees": [
+           {"name": "user" + str(i), "age": 20 + i},
+           {"name": "user" + str(i), "age": 30 + i},
+           {"name": "user" + str(i), "age": 35 + i},
+       ],
+       "labels": {
+           "industry": "software" + str(i),
+           "scale": ">100",
+           "funds": "1000000.0"
+       }
+   }
+   producer.send(company)
+   msg = consumer.receive()
+   # Users could get a dict object by `value()` method.
+   msg.value()
+   ```
 
-  ```java
-  Producer<MyAvro> avroProducer = client.newProducer(Schema.AVRO(MyAvro.class))
-      .topic("some-avro-topic")
-      .create();
-  ```
+</TabItem>
+<TabItem value="Go">
 
+Suppose you have an `avroExampleStruct` class as follows, and you'd like to transmit it over a Pulsar topic.
 
-### Avro schema using C++
+```go
+    type avroExampleStruct struct {
+    ID   int
+    Name string
+}
+```
 
-- The following example shows how to create a producer with an Avro schema.
+1. Add an `avroSchemaDef` like this:
 
-  ```cpp
-  static const std::string exampleSchema =
-      "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
-      "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
-  Producer producer;
-  ProducerConfiguration producerConf;
-  producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
-  client.createProducer("topic-avro", producerConf, producer);
-  ```
+   ```go
+   var (
+       exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+     "\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+   )
+   ```
 
-- The following example shows how to create a consumer with an Avro schema.
+2. Create producer and consumer to send/receive messages:
+
+   ```go
+   //Create producer and send message
+   producer, err := client.CreateProducer(pulsar.ProducerOptions{
+       Topic:  "my-topic",
+       Schema: pulsar.NewAvroSchema(exampleSchemaDef, nil),
+   })
+
+   msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
+       Value: avroExampleStruct{
+          ID:   10,
+          Name: "avroExampleStruct",
+     },
+   })
+
+   //Create Consumer and receive message
+   consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+       Topic:            "my-topic",
+       Schema:           pulsar.NewAvroSchema(exampleSchemaDef, nil),
+       SubscriptionName: "my-sub",
+       Type:             pulsar.Shared,
+   })
+   message, err := consumer.Receive(context.Background())
+   ```
 
-  ```cpp
-  static const std::string exampleSchema =
-      "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
-      "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
-  ConsumerConfiguration consumerConf;
-  Consumer consumer;
-  consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
-  client.subscribe("topic-avro", "sub-2", consumerConf, consumer)
-  ```
+</TabItem>
+</Tabs>
+````
+
+### JSON
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+  defaultValue="Java"
+  values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Python","value":"Python"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
+
+Similar to using `AvroSchema`, you can declare a `JsonSchema` by passing a class. The only difference is to use  `JsonSchema` instead of `AvroSchema` when defining the schema type, as shown below. For how to use `AvroSchema` via record, see [Method 1 - Record](#method-1-record).
+
+```java
+static class SchemaDemo {
+   public String name;
+   public int age;
+}
+
+Producer<SchemaDemo> producer = pulsarClient.newProducer(Schema.JSON(SchemaDemo.class))
+       .topic("my-topic")
+       .create();
+Consumer<SchemaDemo> consumer = pulsarClient.newConsumer(Schema.JSON(SchemaDemo.class))
+       .topic("my-topic")
+       .subscriptionName("my-sub")
+       .subscribe();
+
+SchemaDemo schemaDemo = new SchemaDemo();
+schemaDemo.name = "puslar";
+schemaDemo.age = 20;
+producer.newMessage().value(schemaDemo).send();
+
+Message<SchemaDemo> message = consumer.receive(5, TimeUnit.SECONDS);
+```
+
+</TabItem>
+<TabItem value="C++">
+
+To declare a `JSON` schema using C++, do the following:
+
+1. Pass a JSON string like this:
+
+   ```cpp
+   Std::string jsonSchema = R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+   SchemaInfo schemaInfo = SchemaInfo(JSON, "JSON", jsonSchema);
+   ```
+
+2. Create a producer and use it to send messages.
+
+   ```cpp
+   client.createProducer("my-topic", ProducerConfiguration().setSchema(schemaInfo), producer);
+   std::string jsonData = "{\"re\":2.1,\"im\":1.23}";
+   producer.send(MessageBuilder().setContent(std::move(jsonData)).build());
+   ```
+
+3. Create consumer and receive message.
+
+   ```cpp
+   Consumer consumer;
+   client.subscribe("my-topic", "my-sub", ConsumerConfiguration().setSchema(schemaInfo), consumer);
+   Message msg;
+   consumer.receive(msg);
+   ```
 
-### ProtobufNative schema using C++
+</TabItem>
+<TabItem value="Python">
+
+You can declare a `JsonSchema` by passing a class that inherits from `pulsar.schema.Record` and defines the fields as class variables. This is similar to using `AvroSchema`. The only difference is to use  `JsonSchema` instead of `AvroSchema` when defining schema type, as shown below. For how to use `AvroSchema` via record, see [#method-1-record).
+
+```python
+producer = client.create_producer(
+   'avro-schema-topic',
+   schema=JsonSchema(Example))
+
+consumer = client.subscribe(
+	'avro-schema-topic',
+	'sub',
+	schema=JsonSchema(Example))
+```
+
+</TabItem>
+<TabItem value="Go">
+
+Suppose you have an `avroExampleStruct` class as follows, and you'd like to transmit it as JSON form over a Pulsar topic.
+
+```go
+type jsonExampleStruct struct {
+    ID   int    `json:"id"`
+    Name string `json:"name"`
+}
+```
+
+1. Add a `jsonSchemaDef` like this:
+
+   ```go
+   jsonSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+  "\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
+   ```
+
+2. Create a producer/consumer to send/receive messages:
+
+   ```go
+   //Create producer and send message
+   producer, err := client.CreateProducer(pulsar.ProducerOptions{
+       Topic:  "my-topic",
+       Schema: pulsar.NewJSONSchema(jsonSchemaDef, nil),
+   })
+
+   msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
+       Value: jsonExampleStruct{
+           ID:   10,
+           Name: "jsonExampleStruct",
+     },
+   })
+
+   //Create Consumer and receive message
+   consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+       Topic:            "my-topic",
+       Schema:           pulsar.NewJSONSchema(jsonSchemaDef, nil),
+       SubscriptionName: "my-sub",
+       Type:             pulsar.Exclusive,
+   })
+   message, err := consumer.Receive(context.Background())
+   ```
+
+</TabItem>
+</Tabs>
+````
+
+### ProtobufNative
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+  defaultValue="Java"
+  values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"}]}>
+
+<TabItem value="Java">
+
+The following example shows how to create a producer/consumer with a ProtobufNative schema using Java.
+
+1. Generate the `DemoMessage` class using Protobuf3 or later versions.
+
+   ```protobuf
+   syntax = "proto3";
+   message DemoMessage {
+      string stringField = 1;
+      double doubleField = 2;
+      int32 intField = 6;
+      TestEnum testEnum = 4;
+      SubMessage nestedField = 5;
+      repeated string repeatedField = 10;
+      proto.external.ExternalMessage externalMessage = 11;
+   }
+   ```
+
+2. Create a producer/consumer to send/receive messages.
+
+   ```java
+   Producer<DemoMessage> producer = pulsarClient.newProducer(Schema.PROTOBUF_NATIVE(DemoMessage.class))
+       .topic("my-topic")
+       .create();
+   Consumer<DemoMessage> consumer = pulsarClient.newConsumer(Schema.PROTOBUF_NATIVE(DemoMessage.class))
+       .topic("my-topic")
+       .subscriptionName("my-sub")
+       .subscribe();
+
+   SchemaDemo schemaDemo = new SchemaDemo();
+   schemaDemo.name = "puslar";
+   schemaDemo.age = 20;
+   producer.newMessage().value(DemoMessage.newBuilder().setStringField("string-field-value")
+       .setIntField(1).build()).send();
+
+   Message<DemoMessage> message = consumer.receive(5, TimeUnit.SECONDS);
+   ```
+  
+</TabItem>
+<TabItem value="C++">
 
-The following example shows how to create a producer and a consumer with a ProtobufNative schema.
+The following example shows how to create a producer/consumer with a ProtobufNative schema.
 
 1. Generate the `User` class using Protobuf3 or later versions.
 
@@ -357,26 +709,165 @@ The following example shows how to create a producer and a consumer with a Proto
    user2.ParseFromArray(msg.getData(), msg.getLength());
    ```
 
+</TabItem>
+</Tabs>
+````
+
+### Protobuf
+
+````mdx-code-block
+<Tabs groupId="api-choice"
+  defaultValue="Java"
+  values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Go","value":"Go"}]}>
+
+<TabItem value="Java">
+
+Constructing a protobuf schema using Java is similar to constructing a `ProtobufNative` schema. The only difference is to use `PROTOBUF` instead of `PROTOBUF_NATIVE` when defining schema type as shown below.
+
+1. Generate the `DemoMessage` class using Protobuf3 or later versions.
+
+   ```protobuf
+   syntax = "proto3";
+   message DemoMessage {
+      string stringField = 1;
+      double doubleField = 2;
+      int32 intField = 6;
+      TestEnum testEnum = 4;
+      SubMessage nestedField = 5;
+      repeated string repeatedField = 10;
+      proto.external.ExternalMessage externalMessage = 11;
+   }
+   ```
+
+2. Create a producer/consumer to send/receive messages.
+
+   ```java
+   Producer<DemoMessage> producer = pulsarClient.newProducer(Schema.PROTOBUF(DemoMessage.class))
+          .topic("my-topic")
+          .create();
+   Consumer<DemoMessage> consumer = pulsarClient.newConsumer(Schema.PROTOBUF(DemoMessage.class))
+          .topic("my-topic")
+          .subscriptionName("my-sub")
+          .subscribe();
+
+   SchemaDemo schemaDemo = new SchemaDemo();
+   schemaDemo.name = "puslar";
+   schemaDemo.age = 20;
+   producer.newMessage().value(DemoMessage.newBuilder().setStringField("string-field-value")
+       .setIntField(1).build()).send();
+
+   Message<DemoMessage> message = consumer.receive(5, TimeUnit.SECONDS);
+   ```
+
+</TabItem>
+<TabItem value="C++">
+
+Constructing a protobuf schema using C++ is similar to that using `JSON`. The only difference is to use `PROTOBUF` instead of `JSON` when defining the schema type as shown below.
+
+```cpp
+std::string jsonSchema =
+   R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
+SchemaInfo schemaInfo = SchemaInfo(pulsar::PROTOBUF, "PROTOBUF", jsonSchema);
+```
+
+1. Create a producer to send messages.
+
+   ```cpp
+   Producer producer;
+   client.createProducer("my-topic", ProducerConfiguration().setSchema(schemaInfo), producer);
+   std::string jsonData = "{\"re\":2.1,\"im\":1.23}";
+   producer.send(MessageBuilder().setContent(std::move(jsonData)).build());
+   ```
+
+2. Create a consumer to receive messages.
+
+   ```cpp
+   Consumer consumer;
+   client.subscribe("my-topic", "my-sub", ConsumerConfiguration().setSchema(schemaInfo),   consumer);
+   Message msg;
+   consumer.receive(msg);
+   ```
+
+</TabItem>
+<TabItem value="Go">
 
-## Construct an AUTO_PRODUCE schema
+Suppose you have a `protobufDemo` class as follows, and you'd like to transmit it in JSON form over a Pulsar topic.
+
+```go
+type protobufDemo struct {
+    Num                  int32    `protobuf:"varint,1,opt,name=num,proto3" json:"num,omitempty"`
+    Msf                  string   `protobuf:"bytes,2,opt,name=msf,proto3" json:"msf,omitempty"`
+    XXX_NoUnkeyedLiteral struct{} `json:"-"`
+    XXX_unrecognized     []byte   `json:"-"`
+    XXX_sizecache        int32    `json:"-"`
+}
+```
+
+1. Add a `protoSchemaDef` like this:
+
+   ```go
+   var (
+       protoSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
+           "\"fields\":[{\"name\":\"num\",\"type\":\"int\"},{\"name\":\"msf\",\"type\":\"string\"}]}"
+   )
+   ```
+
+2. Create a producer/consumer to send/receive messages:
+
+   ```go
+   psProducer := pulsar.NewProtoSchema(protoSchemaDef, nil)
+   producer, err := client.CreateProducer(pulsar.ProducerOptions{
+       Topic:  "proto",
+       Schema: psProducer,
+   })
+   msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
+       Value: &protobufDemo{
+           Num: 100,
+           Msf: "pulsar",
+     },
+   })
+   psConsumer := pulsar.NewProtoSchema(protoSchemaDef, nil)
+   consumer, err := client.Subscribe(pulsar.ConsumerOptions{
+       Topic:                       "proto",
+       SubscriptionName:            "sub-1",
+       Schema:                      psConsumer,
+       SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
+   })
+   msg, err := consumer.Receive(context.Background())
+   ```
+
+</TabItem>
+</Tabs>
+````
+
+### Native Avro
+
+This example shows how to construct a [native Avro schema](schema-understand.md#struct-schema).
+
+```java
+org.apache.avro.Schema nativeAvroSchema = … ;
+Producer<byte[]> producer = pulsarClient.newProducer().topic("ingress").create();
+byte[] content = … ;
+producer.newMessage(Schema.NATIVE_AVRO(nativeAvroSchema)).value(content).send();
+```
+
+### AUTO_PRODUCE
 
 Suppose you have a Pulsar topic _P_, a producer processing messages from a Kafka topic _K_, an application reading the messages from _K_ and writing the messages to _P_.
 
 This example shows how to construct an [AUTO_PRODUCE](schema-understand.md#auto-schema) schema to verify whether the bytes produced by _K_ can be sent to _P_.
 
 ```java
-Produce<byte[]> pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE())
+Produce<byte[]> pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES())
     …
     .create();
-
 byte[] kafkaMessageBytes = … ; 
-
 pulsarProducer.produce(kafkaMessageBytes);
 ```
 
-## Construct an AUTO_CONSUME schema
+### AUTO_CONSUME
 
-Suppose you have a Pulsar topic _P_, a consumer (for example, _MySQL_) receiving messages from the topic _P_, an application reading the messages from _P_ and writing the messages to _MySQL_.
+Suppose you have a Pulsar topic _P_ and a consumer _MySQL_ that receives messages from _P_, and you want to check if these messages have the information that your application needs to count.
 
 This example shows how to construct an [AUTO_CONSUME schema](schema-understand.md#auto-schema) to verify whether the bytes produced by _P_ can be sent to _MySQL_.
 
@@ -387,31 +878,23 @@ Consumer<GenericRecord> pulsarConsumer = client.newConsumer(Schema.AUTO_CONSUME(
 
 Message<GenericRecord> msg = consumer.receive() ; 
 GenericRecord record = msg.getValue();
-```
-
-## Construct a native Avro schema
-
-This example shows how to construct a [native Avro schema](schema-understand.md#native-avro-schema).
-
-```java
-org.apache.avro.Schema nativeAvroSchema = … ;
-
-Producer<byte[]> producer = pulsarClient.newProducer().topic("ingress").create();
-
-byte[] content = … ;
-
-producer.newMessage(Schema.NATIVE_AVRO(nativeAvroSchema)).value(content).send();
+record.getFields().forEach((field -> {
+   if (field.getName().equals("theNeedFieldName")) {
+       Object recordField = record.getField(field);
+       //Do some things 
+   }
+}));
 ```
 
 ## Customize schema storage
 
 By default, Pulsar stores various data types of schemas in [Apache BookKeeper](https://bookkeeper.apache.org) deployed alongside Pulsar. Alternatively, you can use another storage system if needed. 
 
-To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces: 
-* [SchemaStorage interface](#schemastorage-interface) 
-* [SchemaStorageFactory interface](#schemastoragefactory-interface)
+To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces before [deploying custom schema storage](#deploy-custom-schema-storage): 
+* [SchemaStorage interface](#implement-schemastorage-interface) 
+* [SchemaStorageFactory interface](#implement-schemastoragefactory-interface)
 
-### Implement SchemaStorage interface
+### Implement `SchemaStorage` interface
 
 The `SchemaStorage` interface has the following methods:
 
@@ -439,11 +922,11 @@ public interface SchemaStorage {
 
 :::tip
 
-For a complete example of **schema storage** implementation, see [BookKeeperSchemaStorage](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java) class.
+For a complete example of **schema storage** implementation, see the [BookKeeperSchemaStorage](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java) class.
 
 :::
 
-### Implement SchemaStorageFactory interface 
+### Implement `SchemaStorageFactory` interface 
 
 The `SchemaStorageFactory` interface has the following method:
 
@@ -456,7 +939,7 @@ public interface SchemaStorageFactory {
 
 :::tip
 
-For a complete example of **schema storage factory** implementation, see [BookKeeperSchemaStorageFactory](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java) class.
+For a complete example of **schema storage factory** implementation, see the [BookKeeperSchemaStorageFactory](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java) class.
 
 :::
 
@@ -470,4 +953,4 @@ To use your custom schema storage implementation, perform the following steps.
    
 3. Change the `schemaRegistryStorageClassName` configuration in the `conf/broker.conf` file to your custom factory class.
       
-4. Start Pulsar.
+4. Start Pulsar.
\ No newline at end of file
diff --git a/site2/website-next/docs/schema-overview.md b/site2/website-next/docs/schema-overview.md
index 52843fc9a01..53aa79a21c8 100644
--- a/site2/website-next/docs/schema-overview.md
+++ b/site2/website-next/docs/schema-overview.md
@@ -13,100 +13,79 @@ This section introduces the following content:
 
 ## What is Pulsar Schema
 
-Pulsar messages are stored as unstructured byte arrays and the data structure (as known as schema) is applied to this data only when it's read. The schema serializes the bytes before they are published to a topic and deserializes them before they are delivered to the consumers, dictating which data types are recognized as valid for a given topic.
+Pulsar messages are stored as unstructured byte arrays and the data structure (as known as schema) is applied to this data only when it's read. So both the producer and consumer need to agree upon the data structure of the messages, including the fields and their associated types.
 
-Pulsar schema registry is a central repository to store the schema information, which enables producers/consumers to coordinate on the schema of a topic’s data through brokers.
+Pulsar schema is the metadata that defines how to translate the raw message bytes into a more formal structure type, serving as a protocol between the applications that generate messages and the applications that consume them. It serializes data into raw bytes before they are published to a topic and deserializes the raw bytes before they are delivered to consumers.
+
+Pulsar uses a schema registry as a central repository to store the registered schema information, which enables producers/consumers to coordinate the schema of a topic’s messages through brokers.
+
+![Pulsar schema](/assets/schema.svg)
 
 :::note
 
-Currently, Pulsar schema is only available for the [Java client](client-libraries-java.md), [Go client](client-libraries-go.md), [Python client](client-libraries-python.md), and [C++ client](client-libraries-cpp.md).
+Currently, Pulsar schema is available for [Java clients](client-libraries-java.md), [Go clients](client-libraries-go.md), [Python clients](client-libraries-python.md), [C++ clients](client-libraries-cpp.md), and [C# clients](client-libraries-dotnet.md).
 
 :::
 
 ## Why use it
 
-Type safety is extremely important in any application built around a messaging and streaming system. Raw bytes are flexible for data transfer, but the flexibility and neutrality come with a cost: you have to overlay data type checking and serialization/deserialization to ensure that the bytes fed into the system can be read and successfully consumed. In other words, you need to make sure the data intelligible and usable to applications.
+Type safety is extremely important in any application built around a messaging and streaming system. Raw bytes are flexible for data transfer, but the flexibility and neutrality come with a cost: you have to overlay data type checking and serialization/deserialization to ensure that the bytes fed into the system can be read and successfully consumed. In other words, you need to make sure the data is intelligible and usable to applications.
 
 Pulsar schema resolves the pain points with the following capabilities:
 * enforces the data type safety when a topic has a schema defined. As a result, producers/consumers are only allowed to connect if they are using a "compatible" schema.
 * provides a central location for storing information about the schemas used within your organization, in turn greatly simplifies the sharing of this information across application teams.
 * serves as a single source of truth for all the message schemas used across all your services and development teams, which makes it easier for them to collaborate.
 * keeps data compatibility on-track between schema versions. When new schemas are uploaded, the new versions can be read by old consumers. 
-* stored in the existing storage layer BookKeeper, no additional system required.
+* stored in the existing storage layer BookKeeper, without additional system required.
 
 ## How it works
 
-Pulsar schemas are applied and enforced at the **topic** level (schemas cannot be applied at the namespace or tenant level). 
-
-Producers and consumers upload schemas to brokers, so Pulsar schemas work on the producer side and the consumer side.
+Pulsar schemas are applied and enforced at the **topic** level. Producers and consumers can upload schemas to brokers, so Pulsar schemas work on both sides.
 
 ### Producer side
 
-This diagram illustrates how schema works on the Producer side.
+This diagram illustrates how Pulsar schema works on the Producer side.
 
-![Schema works at the producer side](/assets/schema-producer.png)
+![How Pulsar schema works on the producer side](/assets/schema-producer.svg)
 
 1. The application uses a schema instance to construct a producer instance. 
+   The schema instance defines the schema for the data being produced using the producer instance. Take Avro as an example, Pulsar extracts the schema definition from the POJO class and constructs the `SchemaInfo`.
 
-   The schema instance defines the schema for the data being produced using the producer instance. 
-
-   Take AVRO as an example, Pulsar extracts schema definition from the POJO class and constructs the `SchemaInfo` that the producer needs to pass to a broker when it connects.
-
-2. The producer connects to the broker with the `SchemaInfo` extracted from the passed-in schema instance.
-   
-3. The broker looks up the schema in the schema storage to check if it is already a registered schema. 
+2. The producer requests to connect to the broker with the `SchemaInfo` extracted from the passed-in schema instance.
    
-4. If yes, the broker skips the schema validation since it is a known schema, and returns the schema version to the producer.
-
-5. If no, the broker verifies whether a schema can be automatically created in this namespace:
-
-   * If `isAllowAutoUpdateSchema` sets to **true**, then a schema can be created, and the broker validates the schema based on the schema compatibility check strategy defined for the topic.
-  
-   * If `isAllowAutoUpdateSchema` sets to **false**, then a schema can not be created, and the producer is rejected to connect to the broker.
-  
-   :::tip
-
-   `isAllowAutoUpdateSchema` can be set via **Pulsar admin API** or **REST API.** 
-
-   For how to set `isAllowAutoUpdateSchema` via Pulsar admin API, see [Manage AutoUpdate Strategy](admin-api-schemas.md#manage-autoupdate-strategy). 
-
-    :::
+3. The broker looks up the schema registry to check if it is a registered schema. 
+   * If the schema is registered, the broker returns the schema version to the producer.
+   * Otherwise, go to step 4.
 
-6. If the schema is allowed to be updated, then the compatible strategy check is performed.
-  
-   * If the schema is compatible, the broker stores it and returns the schema version to the producer. All the messages produced by this producer are tagged with the schema version. 
+4. The broker checks whether the schema can be auto-updated. 
+   * If it’s not allowed to be auto-updated, then the schema cannot be registered, and the broker rejects the producer.
+   * Otherwise, go to step 5.
 
-   * If the schema is incompatible, the broker rejects it.
+5. The broker performs the [schema compatibility check](schema-understand.md#schema-compatibility-check) defined for the topic.
+   * If the schema passes the compatibility check, the broker stores it in the schema registry and returns the schema version to the producer. All the messages produced by this producer are tagged with the schema version. 
+   * Otherwise, the broker rejects the producer.
 
 ### Consumer side
 
 This diagram illustrates how schema works on the consumer side. 
 
-![Schema works at the consumer side](/assets/schema-consumer.png)
+![How Pulsar schema works on the consumer side](/assets/schema-consumer.svg)
 
 1. The application uses a schema instance to construct a consumer instance.
-   
-   The schema instance defines the schema that the consumer uses for decoding messages received from a broker.
 
 2. The consumer connects to the broker with the `SchemaInfo` extracted from the passed-in schema instance.
 
-3. The broker determines whether the topic has one of them (a schema/data/a local consumer and a local producer).
+3. The broker checks if the topic is in use (has at least one of the objects: schema, data, active producer or consumer).
+   * If a topic has at least one of the above objects, go to step 5.
+   * Otherwise, go to step 4.
 
-4. If a topic does not have all of them (a schema/data/a local consumer and a local producer):
-   
-     * If `isAllowAutoUpdateSchema` sets to **true**, then the consumer registers a schema and it is connected to a broker.
-       
-     * If `isAllowAutoUpdateSchema` sets to **false**, then the consumer is rejected to connect to a broker.
-       
-5. If a topic has one of them (a schema/data/a local consumer and a local producer), then the schema compatibility check is performed.
-   
-     * If the schema passes the compatibility check, then the consumer is connected to the broker.
+4. The broker checks whether the schema can be auto-updated.
+     * If the schema can be auto-updated, the broker registers the schema and connects the consumer.
+     * Otherwise, the broker rejects the consumer.
        
-     * If the schema does not pass the compatibility check, then the consumer is rejected to connect to the broker. 
-
-6. The consumer receives messages from the broker. 
-
-   If the schema used by the consumer supports schema versioning (for example, AVRO schema), the consumer fetches the `SchemaInfo` of the version tagged in messages and uses the passed-in schema and the schema tagged in messages to decode the messages.
+5. The broker performs the [schema compatibility check](schema-understand.md#schema-compatibility-check).
+     * If the schema passes the compatibility check, the broker connects the consumer.
+     * Otherwise, the broker rejects the consumer. 
 
 ## Use case
 
@@ -116,8 +95,15 @@ For example, you are using the _User_ class to define the messages sent to Pulsa
 
 ```java
 public class User {
-    String name;
-    int age;
+   public String name;
+   public int age;
+   
+   User() {}
+   
+   User(String name, int age) {
+      this.name = name;
+      this.age = age;
+   }
 }
 ```
 
@@ -139,16 +125,26 @@ producer.send(message);
 This example constructs a producer with the _JSONSchema_, and you can send the _User_ class to topics directly without worrying about how to serialize POJOs into bytes.
 
 ```java
+// send with json schema
 Producer<User> producer = client.newProducer(JSONSchema.of(User.class))
         .topic(topic)
         .create();
 User user = new User("Tom", 28);
 producer.send(user);
+
+// receive with json schema
+Consumer<User> consumer = client.newConsumer(JSONSchema.of(User.class))
+   .topic(schemaTopic)
+   .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+   .subscriptionName("schema-sub")
+   .subscribe();
+Message<User> message = consumer.receive();
+User user = message.getValue();
+assert user.age == 28 && user.name.equals("Tom");
 ```
 
 ## What's next?
 
-* [Understand basic concepts](schema-understand.md)
-* [Schema evolution and compatibility](schema-evolution-compatibility.md)
-* [Get started](schema-get-started.md)
+* [Understand schema concepts](schema-understand.md)
+* [Get started with schema](schema-get-started.md)
 * [Manage schema](admin-api-schemas.md)
diff --git a/site2/website-next/docs/schema-understand.md b/site2/website-next/docs/schema-understand.md
index 64cb6c9b6d8..239c81aad57 100644
--- a/site2/website-next/docs/schema-understand.md
+++ b/site2/website-next/docs/schema-understand.md
@@ -10,24 +10,13 @@ import TabItem from '@theme/TabItem';
 ````
 
 
-This section explains the basic concepts of Pulsar schema and provides additional reference.
+This section explains the basic concepts of Pulsar schema and provides additional references.
 
 ## Schema definition
 
 Pulsar schema is defined in a data structure called `SchemaInfo`. It is stored and enforced on a per-topic basis and cannot be stored at the namespace or tenant level.
 
-A `SchemaInfo` consists of the following fields:
-
-|  Field  |   Description  | 
-| --- | --- |
-|  `name`  |   Schema name (a string).  | 
-|  `type`  |   Schema type, which determines how to interpret the schema data. <li>Predefined schema: see [here](#schema-type). </li><li>Customized schema: it is left as an empty string. </li> | 
-|  `schema`(`payload`)  |   Schema data, which is a sequence of 8-bit unsigned bytes and schema-type specific.  | 
-|  `properties`  |   It is a user-defined property as a string/string map. Applications can use this bag for carrying any application-specific logics. Possible properties might be the Git hash associated with the schema, and an environment string like `dev` or `prod`.  | 
-
-**Example**
-
-This is the `SchemaInfo` of a string.
+This is a string example of `SchemaInfo`.
 
 ```json
 {
@@ -38,6 +27,15 @@ This is the `SchemaInfo` of a string.
 }
 ```
 
+The following table outlines the fields that each `SchemaInfo` consists of.
+
+|  Field  |   Description  | 
+| --- | --- |
+|  `name`  |   Schema name (a string).  | 
+|  `type`  |   [Schema type](#schema-type) that determines how to serialize and deserialize the schema data. | 
+|  `schema`  |   Schema data, which is a sequence of 8-bit unsigned bytes and specific schema type.  | 
+|  `properties`  |   A user-defined property as a string/string map, which can be used by applications to carry any application-specific logic.  |
+
 ## Schema type
 
 Pulsar supports various schema types, which are mainly divided into two categories: 
@@ -48,92 +46,227 @@ Pulsar supports various schema types, which are mainly divided into two categori
 
 The following table outlines the primitive types that Pulsar schema supports, and the conversions between **schema types** and **language-specific primitive types**.
 
-| Primitive Type | Description | Java Type| Python Type | Go Type |
-|---|---|---|---|---|
-| `BOOLEAN` | A binary value | boolean | bool | bool |
-| `INT8` | A 8-bit signed integer | int | | int8 |
-| `INT16` | A 16-bit signed integer | int | | int16 |
-| `INT32` | A 32-bit signed integer | int | | int32 |
-| `INT64` | A 64-bit signed integer | int | | int64 |
-| `FLOAT` | A single precision (32-bit) IEEE 754 floating-point number | float | float | float32 |
-| `DOUBLE` | A double-precision (64-bit) IEEE 754 floating-point number | double | float | float64|
-| `BYTES` | A sequence of 8-bit unsigned bytes | byte[], ByteBuffer, ByteBuf | bytes | []byte |
-| `STRING` | A Unicode character sequence | string | str | string| 
-| `TIMESTAMP` (`DATE`, `TIME`) |  A logic type represents a specific instant in time with millisecond precision. <br />It stores the number of milliseconds since `January 1, 1970, 00:00:00 GMT` as an `INT64` value |  java.sql.Timestamp (java.sql.Time, java.util.Date) | | |
-| INSTANT | A single instantaneous point on the time-line with nanoseconds precision| java.time.Instant | | |
-| LOCAL_DATE | An immutable date-time object that represents a date, often viewed as year-month-day| java.time.LocalDate | | |
-| LOCAL_TIME | An immutable date-time object that represents a time, often viewed as hour-minute-second. Time is represented to nanosecond precision.| java.time.LocalDateTime | |
-| LOCAL_DATE_TIME | An immutable date-time object that represents a date-time, often viewed as year-month-day-hour-minute-second | java.time.LocalTime | |
+| Primitive Type | Description | Java Type| Python Type | Go Type | C++ Type | C# Type|
+|---|---|---|---|---|---|---|
+| `BOOLEAN` | A binary value. | boolean | bool | bool | bool | bool |
+| `INT8` | A 8-bit signed integer. | int | int | int8 | int8_t | byte |
+| `INT16` | A 16-bit signed integer. | int | int | int16 | int16_t | short |
+| `INT32` | A 32-bit signed integer. | int | int | int32 | int32_t | int |
+| `INT64` | A 64-bit signed integer. | int | int | int64 | int64_t | long |
+| `FLOAT` | A single precision (32-bit) IEEE 754 floating-point number. | float | float | float32 | float | float |
+| `DOUBLE` | A double-precision (64-bit) IEEE 754 floating-point number. | double | double | float64| double | double |
+| `BYTES` | A sequence of 8-bit unsigned bytes. | byte[], ByteBuffer, ByteBuf | bytes | []byte | void * | byte[], ReadOnlySequence<byte\> |
+| `STRING` | An Unicode character sequence. | string | str | string| std::string | string |
+| `TIMESTAMP` (`DATE`, `TIME`) |  A logic type represents a specific instant in time with millisecond precision. <br />It stores the number of milliseconds since `January 1, 1970, 00:00:00 GMT` as an `INT64` value. |  java.sql.Timestamp (java.sql.Time, java.util.Date) | N/A | N/A | N/A | DateTime,TimeSpan |
+| `INSTANT`| A single instantaneous point on the timeline with nanoseconds precision. | java.time.Instant | N/A | N/A | N/A | N/A |
+| `LOCAL_DATE` | An immutable date-time object that represents a date, often viewed as year-month-day. | java.time.LocalDate | N/A | N/A | N/A | N/A |
+| `LOCAL_TIME` | An immutable date-time object that represents a time, often viewed as hour-minute-second. Time is represented to nanosecond precision. | java.time.LocalDateTime | N/A | N/A  | N/A | N/A |
+| LOCAL_DATE_TIME | An immutable date-time object that represents a date-time, often viewed as year-month-day-hour-minute-second. | java.time.LocalTime | N/A | N/A | N/A | N/A |
+
+:::note
 
-For primitive types, Pulsar does not store any schema data in `SchemaInfo`. The `type` in `SchemaInfo` determines how to serialize and deserialize the data. 
+Pulsar does not store any schema data in `SchemaInfo` for primitive types. Some of the primitive schema implementations can use the `properties` parameter to store implementation-specific tunable settings. For example, a string schema can use `properties` to store the encoding charset to serialize and deserialize strings.
 
-Some of the primitive schema implementations can use `properties` to store implementation-specific tunable settings. For example, a `string` schema can use `properties` to store the encoding charset to serialize and deserialize strings.
+:::
 
-For more instructions, see [Construct a string schema](schema-get-started.md#construct-a-string-schema).
+For more instructions and examples, see [Construct a string schema](schema-get-started.md#string).
 
 
 ### Complex type
 
-Currently, Pulsar supports the following complex types:
+The following table outlines the complex types that Pulsar schema supports:
 
 | Complex Type | Description |
 |---|---|
-| `KeyValue` | Represents a complex type of a key/value pair. |
-| `Struct` | Handles structured data. It supports `AvroBaseStructSchema` and `ProtobufNativeSchema`. |
+| `KeyValue` | Represents a complex key/value pair. |
+| `Struct` | Represents structured data, including `AvroBaseStructSchema`, `ProtobufNativeSchema` and `NativeAvroBytesSchema`. |
 
 #### `KeyValue` schema
 
-`KeyValue` schema helps applications define schemas for both key and value. Pulsar stores the `SchemaInfo` of key schema and the value schema together.
+`KeyValue` schema helps applications define schemas for both key and value. Pulsar stores the `SchemaInfo` of the key schema and the value schema together.
 
-You can choose the encoding type when constructing the key/value schema.:
-* `INLINE` - Key/value pairs are encoded together in the message payload.
-* `SEPARATED` - see [Construct a key/value schema](schema-get-started.md#construct-a-keyvalue-schema).
+Pulsar provides the following methods to encode a **single** key/value pair in a message:
+* `INLINE` - Key/Value pairs are encoded together in the message payload.
+* `SEPARATED` - The Key is stored as a message key, while the value is stored as the message payload. See [Construct a key/value schema](schema-get-started.md#keyvalue) for more details.
 
 #### `Struct` schema
 
-`Struct` schema supports `AvroBaseStructSchema` and `ProtobufNativeSchema`.
+The following table outlines the `struct` types that Pulsar schema supports:
 
-|Type|Description|
----|---|
-`AvroBaseStructSchema`|Pulsar uses [Avro Specification](http://avro.apache.org/docs/current/spec.html) to declare the schema definition for `AvroBaseStructSchema`, which supports  `AvroSchema`, `JsonSchema`, and `ProtobufSchema`. <br /><br />This allows Pulsar:<br />- to use the same tools to manage schema definitions<br />- to use different serialization or deserialization methods to handle data|
-`ProtobufNativeSchema`|`ProtobufNativeSchema` is based on protobuf native Descriptor. <br /><br />This allows Pulsar:<br />- to use native protobuf-v3 to serialize or deserialize data<br />- to use `AutoConsume` to deserialize data.
+| Type                    | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                      [...]
+|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...]
+| `AvroBaseStructSchema`  | Pulsar uses [Avro Specification](http://avro.apache.org/docs/current/spec.html) to declare the schema definition for `AvroBaseStructSchema`, which supports [`AvroSchema`](schema-get-started.md#avro), [`JsonSchema`](schema-get-started.md#json), and [`ProtobufSchema`](schema-get-started.md#protobuf).<br /><br />This allows Pulsar to:<br />- use the same tools to manage schema definitions.<br />- use different serialization or deserialization methods to handle da [...]
+| `ProtobufNativeSchema`  | [`ProtobufNativeSchema`](schema-get-started.md#protobufnative) is based on protobuf native descriptor. <br /><br />This allows Pulsar to:<br />- use native protobuf-v3 to serialize or deserialize data.<br />- use `AutoConsume` to deserialize data.                                                                                                                                                                                                                          [...]
+| `NativeAvroBytesSchema` | [`NativeAvroBytesSchema`](schema-get-started.md#native-avro) wraps a native Avro schema type `org.apache.avro.Schema`. The result is a schema instance that accepts a serialized Avro payload without validating it against the wrapped Avro schema. <br /><br />When you migrate or ingest event or messaging data from external systems (such as Kafka and Cassandra), the data is often already serialized in Avro format. The applications producing the data typically have [...]
 
 Pulsar provides the following methods to use the `struct` schema. 
 * `static`
 * `generic`
 * `SchemaDefinition`
 
-For more examples, see [Construct a struct schema](schema-get-started.md#construct-a-struct-schema).
+This example shows how to construct a `struct` schema with these methods and use it to produce and consume messages.
+
+````mdx-code-block
+<Tabs 
+  defaultValue="static"
+  values={[{"label":"static","value":"static"},{"label":"generic","value":"generic"},{"label":"SchemaDefinition","value":"SchemaDefinition"}]}>
+
+<TabItem value="static">
+
+You can predefine the `struct` schema, which can be a POJO in Java, a `struct` in Go, or classes generated by Avro or Protobuf tools. 
+
+**Example** 
+
+Pulsar gets the schema definition from the predefined `struct` using an Avro library. The schema definition is the schema data stored as a part of the `SchemaInfo`.
+
+1. Create the _User_ class to define the messages sent to Pulsar topics.
+
+   ```java
+   public static class User {
+       public String name;
+       public int age;
+       public User(String name, int age) {
+ 	this.name = name;
+	this.age = age
+       }
+       public User() {}
+   }
+   ```
+
+2. Create a producer with a `struct` schema and send messages.
+
+   ```java
+   Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
+   producer.newMessage().value(new User("pulsar-user", 1)).send();
+   ```
+
+3. Create a consumer with a `struct` schema and receive messages
+
+   ```java
+   Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe();
+   User user = consumer.receive().getValue();
+   ```
+
+</TabItem>
+<TabItem value="generic">
+
+Sometimes applications do not have pre-defined structs, and you can use this method to define schema and access data.
+
+You can define the `struct` schema using the `GenericSchemaBuilder`, generate a generic struct using `GenericRecordBuilder`, and consume messages into `GenericRecord`.
+
+**Example** 
+
+1. Use `RecordSchemaBuilder` to build a schema.
+
+   ```java
+   RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName");
+   recordSchemaBuilder.field("intField").type(SchemaType.INT32);
+   SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO);
+   
+   Consumer<GenericRecord> consumer = client.newConsumer(Schema.generic(schemaInfo))
+        .topic(topicName)
+        .subscriptionName(subscriptionName)
+        .subscribe();
+   Producer<GenericRecord> producer = client.newProducer(Schema.generic(schemaInfo))
+        .topic(topicName)
+        .create();
+   ```
+
+2. Use `RecordBuilder` to build the struct records.
+
+   ```java
+   GenericSchemaImpl schema = GenericAvroSchema.of(schemaInfo);
+   // send message
+   GenericRecord record = schema.newRecordBuilder().set("intField", 32).build();
+   producer.newMessage().value(record).send();
+   // receive message
+   Message<GenericRecord> msg = consumer.receive();
+   
+   Assert.assertEquals(msg.getValue().getField("intField"), 32);
+   ```
+
+</TabItem>
+<TabItem value="SchemaDefinition">
+
+You can define the `schemaDefinition` to generate a `struct` schema.
+
+**Example** 
+
+1. Create the _User_ class to define the messages sent to Pulsar topics.
+
+   ```java
+   public static class User {
+       public String name;
+       public int age;
+       public User(String name, int age) {
+ 	this.name = name;
+	this.age = age
+       }
+       public User() {}
+   }
+   ```
+
+2. Create a producer with a `SchemaDefinition` and send messages.
+
+   ```java
+   SchemaDefinition<User> schemaDefinition = SchemaDefinition.<User>builder().withPojo(User.class).build();
+   Producer<User> producer = client.newProducer(Schema.AVRO(schemaDefinition)).create();
+   producer.newMessage().value(new User ("pulsar-user", 1)).send();
+   ```
+
+3. Create a consumer with a `SchemaDefinition` schema and receive messages.
+
+   ```java
+   SchemaDefinition<User> schemaDefinition = SchemaDefinition.<User>builder().withPojo(User.class).build();
+   Consumer<User> consumer = client.newConsumer(Schema.AVRO(schemaDefinition)).subscribe();
+   User user = consumer.receive().getValue();
+   ```
+
+</TabItem>
+</Tabs>
+````
 
 ### Auto Schema
 
-If you don't know the schema type of a Pulsar topic in advance, you can use AUTO schema to produce or consume generic records to or from brokers.
+If there is no chance to know the schema type of a Pulsar topic in advance, you can use AUTO schemas to produce/consume generic records to/from brokers.
 
 Auto schema contains two categories:
-* `AUTO_PRODUCE` transfers data from a producer to a Pulsar topic that has a schema and helps the producer validate whether the out-bound bytes are compatible with the schema of the topic. For more instructions, see [Construct an AUTO_PRODUCE schema](schema-get-started.md#construct-an-auto_produce-schema).
-* `AUTO_CONSUME` transfers data from a Pulsar topic that has a schema to a consumer and helps the topic validate whether the out-bound bytes are compatible with the consumer. In other words, the topic deserializes messages into language-specific objects `GenericRecord` using the `SchemaInfo` retrieved from brokers. Currently, `AUTO_CONSUME` supports AVRO, JSON and ProtobufNativeSchema schemas. For more instructions, see [Construct an AUTO_CONSUME schema](schema-get-started.md#construct-a [...]
+* `AUTO_PRODUCE` transfers data from a producer to a Pulsar topic that has a schema and helps the producer validate whether the outbound bytes are compatible with the schema of the topic. For more instructions, see [Construct an AUTO_PRODUCE schema](schema-get-started.md#auto_produce).
+* `AUTO_CONSUME` transfers data from a Pulsar topic that has a schema to a consumer and helps the topic validate whether the out-bound bytes are compatible with the consumer. In other words, the topic deserializes messages into language-specific objects `GenericRecord` using the `SchemaInfo` retrieved from brokers. For more instructions, see [Construct an AUTO_CONSUME schema](schema-get-started.md#auto_consume).
 
-### Native Avro Schema
+## Schema validation enforcement
 
-When migrating or ingesting event or message data from external systems (such as Kafka and Cassandra), the events are often already serialized in Avro format. The applications producing the data typically have validated the data against their schemas (including compatibility checks) and stored them in a database or a dedicated service (such as a schema registry). The schema of each serialized data record is usually retrievable by some metadata attached to that record. In such cases, a Pu [...]
+Schema validation enforcement enables brokers to reject producers/consumers without a schema.
 
-Hence, we provide `Schema.NATIVE_AVRO` to wrap a native Avro schema of type `org.apache.avro.Schema`. The result is a schema instance of Pulsar that accepts a serialized Avro payload without validating it against the wrapped Avro schema. See for more details.
+By default, schema validation enforcement is only **disabled** (`isSchemaValidationEnforced`=`false`) for producers, which means:
+* A producer without a schema can produce any messages to a topic with schemas, which may result in producing trash data to the topic. 
+* Clients that don’t support schema are allowed to produce messages to a topic with schemas.
 
-## Schema versioning
+For how to enable schema validation enforcement, see [Manage schema validation](admin-api-schemas.md#manage-schema-validation).
 
-Each `SchemaInfo` stored with a topic has a version. The schema version manages schema changes happening within a topic. 
+## Schema evolution
 
-Messages produced with a given `SchemaInfo` is tagged with a schema version, so when a message is consumed by a Pulsar client, the Pulsar client can use the schema version to retrieve the corresponding `SchemaInfo` and then use the `SchemaInfo` to deserialize data.
+Schemas store the details of attributes and types. To satisfy new business needs, schemas undergo evolution over time with [versioning](#schema-versioning). 
 
-Schemas are versioned in succession. Schema storage happens in a broker that handles the associated topics so that version assignments can be made. 
+:::note
 
-Once a version is assigned/fetched to/for a schema, all subsequent messages produced by that producer are tagged with the appropriate version.
+Schema evolution only applies to Avro, JSON, Protobuf, and ProtobufNative schemas. 
 
-**Example**
+:::
 
-The following example illustrates how the schema version works.
+Schema evolution may impact existing consumers. The following control measures have been designed to serve schema evolution and ensure the downstream consumers can seamlessly handle schema evolution:
+* [Schema compatibility check](#schema-compatibility-check)
+* [Schema `AutoUpdate`](#schema-autoupdate)
 
-Suppose that a Pulsar [Java client](client-libraries-java.md) created using the code below attempts to connect to Pulsar and begins to send messages:
+For further readings about schema evolution, see [Avro documentation](https://avro.apache.org/docs/1.10.2/spec.html#Schema+Resolution) and [Protobuf documentation](https://developers.google.com/protocol-buffers/docs/proto#optional).
+
+### Schema versioning
+
+Each `SchemaInfo` stored with a topic has a version. The schema version manages schema changes happening within a topic. 
+
+Messages produced with `SchemaInfo` are tagged with a schema version. When a message is consumed by a Pulsar client, the client can use the schema version to retrieve the corresponding `SchemaInfo` and use the correct schema to deserialize data. Once a version is assigned to or fetched from a schema, all subsequent messages produced by that producer are tagged with the appropriate version.
+
+Suppose you are using a Pulsar [Java client](client-libraries-java.md) to create a producer and send messages.
 
 ```java
 PulsarClient client = PulsarClient.builder()
@@ -146,67 +279,88 @@ Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReadin
         .create();
 ```
 
-The table below lists the possible scenarios when this connection attempt occurs and what happens in each scenario:
+The table below outlines the possible scenarios when this connection attempt occurs and the result of each scenario:
 
-| Scenario |  What happens | 
-| --- | --- |
-|  <li>No schema exists for the topic. </li> |   (1) The producer is created using the given schema. (2) Since no existing schema is compatible with the `SensorReading` schema, the schema is transmitted to the broker and stored. (3) Any consumer created using the same schema or topic can consume messages from the `sensor-data` topic.  | 
-|  <li>A schema already exists. </li><li>The producer connects using the same schema that is already stored. </li> |   (1) The schema is transmitted to the broker. (2) The broker determines that the schema is compatible. (3) The broker attempts to store the schema in [BookKeeper](concepts-architecture-overview.md#persistent-storage) but then determines that it's already stored, so it is used to tag produced messages.  |   <li>A schema already exists. </li><li>The producer connects using  [...]
+| Scenario                                                                                                        | Result                                                                                                                                                                                                                                                                                                                      |
+|-----------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| <li>No schema exists for the topic. </li>                                                                       | (1) The producer is created with the given schema. <br /> (2) The schema is transmitted to the broker and stored since there is no existing schema. <br /> (3) Any consumer created using the same schema or topic can consume messages from the `sensor-data` topic.                                                       |
+| <li>A schema already exists. </li><li>The producer connects using the same schema that is already stored. </li> | (1) The schema is transmitted to the broker.<br />  (2) The broker determines that the schema is compatible. <br /> (3) The broker attempts to store the schema in [BookKeeper](concepts-architecture-overview.md#persistent-storage) but then determines that it's already stored, so it is used to tag produced messages. |
+| <li>A schema already exists. </li><li>The producer connects using a new schema that is compatible. </li>        | (1) The schema is transmitted to the broker. <br /> (2) The broker determines that the schema is compatible and stores the new schema as the current version (with a new version number).                                                                                                                                   |
 
-## Schema AutoUpdate
+### Schema compatibility check
 
-If a schema passes the schema compatibility check, Pulsar producer automatically updates this schema to the topic it produces by default. 
+The purpose of schema compatibility check is to ensure that existing consumers can process the introduced messages.
 
-### AutoUpdate for producer
+When receiving a `SchemaInfo` from producers, brokers recognize the schema type and deploy the schema compatibility checker ([`schemaRegistryCompatibilityCheckers`](https://github.com/apache/pulsar/blob/bf194b557c48e2d3246e44f1fc28876932d8ecb8/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java)) for that schema type to check if the `SchemaInfo` is compatible with the schema of the topic by applying the configured compatibility check strategy. 
 
-For a producer, the `AutoUpdate` happens in the following cases:
+The default value of `schemaRegistryCompatibilityCheckers` in the `conf/broker.conf` file is as follows.
+   
+```properties
+schemaRegistryCompatibilityCheckers=org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck,org.apache.pulsar.broker.service.schema.ProtobufNativeSchemaCompatibilityCheck
+```
 
-* If a **topic doesn’t have a schema**, Pulsar registers a schema automatically.
+Each schema type corresponds to one instance of the schema compatibility checker. Avro, JSON, and Protobuf schemas have their own compatibility checkers, while all the other schema types share the default compatibility checker that disables the schema evolution.
 
-* If a **topic has a schema**:
+#### Schema compatibility check strategy
 
-  * If a **producer doesn’t carry a schema**:
+Suppose that you have a topic containing three schemas (V1, V2, and V3). V1 is the oldest and V3 is the latest. The following table outlines 8 schema compatibility strategies and how it works.
 
-  * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is **disabled** in the namespace to which the topic belongs, the producer is allowed to connect to the topic and produce data. 
-  
-  * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is **enabled** in the namespace to which the topic belongs, the producer is rejected and disconnected.
+|  Compatibility check strategy  |   Definition  |   Changes allowed  |   Check against which schema  |
+| --- | --- | --- | --- |
+|  `ALWAYS_COMPATIBLE`  |   Disable schema compatibility check.  |   All changes are allowed  |   All previous versions  |
+|  `ALWAYS_INCOMPATIBLE`  |   Disable schema evolution, that is, any schema change is rejected.  |   No change is allowed  |   N/A  | 
+|  `BACKWARD`  |   Consumers using schema V3 can process data written by producers using the **last schema version** V2.  |   <li>Add optional fields </li><li>Delete fields </li> |   Latest version  |
+|  `BACKWARD_TRANSITIVE`  |   Consumers using schema V3 can process data written by producers using **all previous schema versions** V2 and V1.  |   <li>Add optional fields </li><li>Delete fields </li> |   All previous versions  |
+|  `FORWARD`  |   Consumers using the **last schema version** V2 can process data written by producers using a new schema V3, even though they may not be able to use the full capabilities of the new schema.  |   <li>Add fields </li><li>Delete optional fields </li> |   Latest version  |
+|  `FORWARD_TRANSITIVE`  |   Consumers using **all previous schema versions** V2 or V1 can process data written by producers using a new schema V3.  |   <li>Add fields </li><li>Delete optional fields </li> |   All previous versions  |
+|  `FULL`  |   Schemas are both backward and forward compatible. <li>Consumers using the last schema V2 can process data written by producers using the new schema V3. </li><li>Consumers using the new schema V3 can process data written by producers using the last schema V2.</li>  |   Modify optional fields |   Latest version  | 
+|  `FULL_TRANSITIVE`  |   Backward and forward compatible among schema V3, V2, and V1. <li>Consumers using the schema V3 can process data written by producers using schema V2 and V1. </li><li>Consumers using the schema V2 or V1 can process data written by producers using the schema V3.</li>  |   Modify optional fields |   All previous versions  |
 
-  * If a **producer carries a schema**:
-  
-  A broker performs the compatibility check based on the configured compatibility check strategy of the namespace to which the topic belongs.
-  
-  * If the schema is registered, a producer is connected to a broker. 
-  
-  * If the schema is not registered:
-  
-     * If `isAllowAutoUpdateSchema` sets to **false**, the producer is rejected to connect to a broker.
+:::tip
+
+* The default schema compatibility check strategy varies depending on schema types.
+  * For Avro and JSON, the default one is `FULL`.
+  * For others, the default one is `ALWAYS_INCOMPATIBLE`.
+* For more instructions about how to set the strategy, see [Manage schemas](admin-api-schemas.md#set-schema-compatibility-check-strategy).
+
+:::
+
+### Schema AutoUpdate
+
+By default, schema `AutoUpdate` is enabled. When a schema passes the schema compatibility check, the producer automatically updates this schema to the topic it produces. 
+
+#### Producer side
+
+For a producer, the `AutoUpdate` happens in the following cases:
+
+* If a **topic doesn’t have a schema** (meaning the data is in raw bytes), Pulsar registers the schema automatically.
+
+* If a **topic has a schema** and the **producer doesn’t carry any schema** (meaning it produces raw bytes):
+
+    * If [schema validation enforcement](#schema-validation-enforcement) is **disabled** (`schemaValidationEnforced`=`false`) in the namespace that the topic belongs to, the producer is allowed to connect to the topic and produce data. 
   
-      * If `isAllowAutoUpdateSchema` sets to **true**:
-   
-          * If the schema passes the compatibility check, then the broker registers a new schema automatically for the topic and the producer is connected.
-      
-          * If the schema does not pass the compatibility check, then the broker does not register a schema and the producer is rejected to connect to a broker.
+    * Otherwise, the producer is rejected.
 
-![AutoUpdate Producer](/assets/schema-producer.png)
+  * If a **topic has a schema** and the **producer carries a schema**, see [How schema works on producer side](schema-overview.md#producer-side) for more information.
 
-### AutoUpdate for consumer
+#### Consumer side
 
 For a consumer, the `AutoUpdate` happens in the following cases:
 
-* If a **consumer connects to a topic without a schema** (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check.
+* If a consumer connects to a topic **without a schema** (meaning it consumes raw bytes), the consumer can connect to the topic successfully without doing any compatibility check.
 
-* If a **consumer connects to a topic with a schema**.
+* If a consumer connects to a topic **with a schema**, see [How schema works on consumer side](schema-overview.md#consumer-side) for more information.
 
-  * If a topic does not have all of them (a schema/data/a local consumer and a local producer):
-  
-      * If `isAllowAutoUpdateSchema` sets to **true**, then the consumer registers a schema and it is connected to a broker.
-      
-      * If `isAllowAutoUpdateSchema` sets to **false**, then the consumer is rejected to connect to a broker.
-      
-  * If a topic has one of them (a schema/data/a local consumer and a local producer), then the schema compatibility check is performed.
-  
-      * If the schema passes the compatibility check, then the consumer is connected to the broker.
-      
-      * If the schema does not pass the compatibility check, then the consumer is rejected to connect to the broker.
-      
-![AutoUpdate Consumer](/assets/schema-consumer.png)
\ No newline at end of file
+### Order of upgrading clients
+
+To adapt to schema evolution and auto-update, you need to upgrade your client applications accordingly. The upgrade order may vary depending on the configured [schema compatibility check strategy](#schema-compatibility-check-strategy).
+
+The following table outlines the mapping between the schema compatibility check strategy and the upgrade order of clients.
+
+|  Compatibility check strategy  |   Upgrade order  | Description                                                                                                                                                                                                                                                                                                         | 
+| --- | --- |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+|  `ALWAYS_COMPATIBLE`  |   Any order  | The compatibility check is disabled. Consequently, you can upgrade the producers and consumers in **any order**.                                                                                                                                                                                                    | 
+|  `ALWAYS_INCOMPATIBLE`  |   N/A  | The schema evolution is disabled.                                                                                                                                                                                                                                                                                   | 
+|  <li>`BACKWARD` </li><li>`BACKWARD_TRANSITIVE` </li> |   Consumer first  | There is no guarantee that consumers using the old schema can read data produced using the new schema. Consequently, **upgrade all consumers first**, and then start producing new data.                                                                                                                            | 
+|  <li>`FORWARD` </li><li>`FORWARD_TRANSITIVE` </li> |   Producer first  | There is no guarantee that consumers using the new schema can read data produced using the old schema. Consequently, **upgrade all producers first** to use the new schema and ensure the data that has already been produced using the old schemas are not available to consumers, and then upgrade the consumers. | 
+|  <li>`FULL` </li><li>`FULL_TRANSITIVE` </li> |   Any order  | It is guaranteed that consumers using the old schema can read data produced using the new schema and consumers using the new schema can read data produced using the old schema. Consequently, you can upgrade the producers and consumers in **any order**.                                                        |
\ No newline at end of file
diff --git a/site2/website-next/sidebars.json b/site2/website-next/sidebars.json
index db88d61e27d..97c21a759a6 100644
--- a/site2/website-next/sidebars.json
+++ b/site2/website-next/sidebars.json
@@ -40,8 +40,7 @@
       "items": [
         "schema-overview",
         "schema-understand",
-        "schema-get-started",
-        "schema-evolution-compatibility"
+        "schema-get-started"
       ]
     },
     {
diff --git a/site2/website-next/static/assets/schema-consumer.svg b/site2/website-next/static/assets/schema-consumer.svg
new file mode 100644
index 00000000000..a5e3f0c1e85
--- /dev/null
+++ b/site2/website-next/static/assets/schema-consumer.svg
@@ -0,0 +1 @@
+<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns:lucid="lucid" width="1548" height="907.88"><g transform="translate(-218.86564898492907 -1343.203082025575)" lucid:page-tab-id="0_0"><path d="M0 1258.83h1870.87v1322.84H0z" fill="#fff"/><path d="M238.87 1496.7a6 6 0 0 1 6-6h1496a6 6 0 0 1 6 6V2219a6 6 0 0 1-6 6h-1496a6 6 0 0 1-6-6z" fill="#fff"/><path d="M240.37 1496.7c0 .84-.68 1.5-1.5 1.5-.83 0-1.5-.66-1.5-1.5 0-.82.67-1.5 1.5-1.5.82 0 1.5.68 1.5 1. [...]
\ No newline at end of file
diff --git a/site2/website-next/static/assets/schema-producer.svg b/site2/website-next/static/assets/schema-producer.svg
new file mode 100644
index 00000000000..e46ed63f2e2
--- /dev/null
+++ b/site2/website-next/static/assets/schema-producer.svg
@@ -0,0 +1 @@
+<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns:lucid="lucid" width="1592" height="915.38"><g transform="translate(-2005.9370513311515 -1369.203082025575)" lucid:page-tab-id="0_0"><path d="M1806.87 1258.83h1870.86v1322.84H1806.87z" fill="#fff"/><path d="M2025.94 1520.75a6 6 0 0 1 6-6h1540a6 6 0 0 1 6 6v737.83a6 6 0 0 1-6 6h-1540a6 6 0 0 1-6-6z" fill="#fff"/><path d="M2027.44 1520.75c0 .83-.67 1.5-1.5 1.5s-1.5-.67-1.5-1.5.67-1.5 1.5-1.5 1.5.67 1.5  [...]
\ No newline at end of file
diff --git a/site2/website-next/static/assets/schema.svg b/site2/website-next/static/assets/schema.svg
new file mode 100644
index 00000000000..e8594764804
--- /dev/null
+++ b/site2/website-next/static/assets/schema.svg
@@ -0,0 +1 @@
+<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns:lucid="lucid" width="1954.13" height="1014.17"><g transform="translate(-1950 1120.167979002624)" lucid:page-tab-id="0_0"><path d="M1806.87-1258.83H5484.6V64H1806.87z" fill="#fff"/><path d="M1980-544.2a6 6 0 0 1 6-6h1348a6 6 0 0 1 6 6V-146a6 6 0 0 1-6 6H1986a6 6 0 0 1-6-6z" fill="#fff"/><path d="M1981.5-544.2c0 .82-.67 1.5-1.5 1.5s-1.5-.68-1.5-1.5c0-.83.67-1.5 1.5-1.5s1.5.67 1.5 1.5zm5.06-5.93c0 .83-. [...]
\ No newline at end of file