You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by li...@apache.org on 2021/12/07 04:51:07 UTC

[pulsar] branch master updated: [website][upgrade]feat: website upgrade / docs migration - 2.4.1 / get started/concepts/schema (#13142)

This is an automated email from the ASF dual-hosted git repository.

liuyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f86b6d  [website][upgrade]feat: website upgrade / docs migration - 2.4.1 / get started/concepts/schema (#13142)
3f86b6d is described below

commit 3f86b6dc3d8668d6c629f89c47cbdcdc8017e132
Author: Li Li <ur...@gmail.com>
AuthorDate: Tue Dec 7 12:49:21 2021 +0800

    [website][upgrade]feat: website upgrade / docs migration - 2.4.1 / get started/concepts/schema (#13142)
    
    Signed-off-by: LiLi <ur...@gmail.com>
    
    Co-authored-by: Anonymitaet <50...@users.noreply.github.com>
---
 site2/website-next/docusaurus.config.js            |   4 +
 .../version-2.4.1/client-libraries.md              |  35 ++
 .../concepts-architecture-overview.md              | 172 ++++++
 .../version-2.4.1/concepts-authentication.md       |   9 +
 .../version-2.4.1/concepts-clients.md              |  92 ++++
 .../version-2.4.1/concepts-messaging.md            | 438 +++++++++++++++
 .../version-2.4.1/concepts-multi-tenancy.md        |  59 ++
 .../version-2.4.1/concepts-overview.md             |  31 ++
 .../version-2.4.1/concepts-replication.md          |   9 +
 .../version-2.4.1/concepts-schema-registry.md      | 126 +++++
 .../version-2.4.1/concepts-tiered-storage.md       |  18 +
 .../version-2.4.1/concepts-topic-compaction.md     |  37 ++
 .../version-2.4.1/getting-started-standalone.md    | 266 +++++++++
 .../versioned_docs/version-2.4.1/pulsar-2.0.md     |  72 +++
 .../schema-evolution-compatibility.md              | 169 ++++++
 .../version-2.4.1/schema-get-started.md            |  98 ++++
 .../versioned_docs/version-2.4.1/schema-manage.md  | 602 +++++++++++++++++++++
 .../version-2.4.1/schema-understand.md             | 454 ++++++++++++++++
 .../version-2.4.1/standalone-docker.md             | 179 ++++++
 .../version-2.7.2/developing-binary-protocol.md    | 581 --------------------
 .../versioned_docs/version-2.7.2/developing-cpp.md | 114 ----
 .../version-2.7.2/developing-load-manager.md       | 227 --------
 .../version-2.7.2/developing-tools.md              | 111 ----
 .../versioned_sidebars/version-2.4.1-sidebars.json |  94 ++++
 site2/website-next/versions.json                   |   1 +
 25 files changed, 2965 insertions(+), 1033 deletions(-)

diff --git a/site2/website-next/docusaurus.config.js b/site2/website-next/docusaurus.config.js
index 6ec6be8..52ade69 100644
--- a/site2/website-next/docusaurus.config.js
+++ b/site2/website-next/docusaurus.config.js
@@ -192,6 +192,10 @@ module.exports = {
               to: "docs/2.4.2/"
             },
             {
+              label: "2.4.1",
+              to: "docs/2.4.1/"
+            },
+            {
               label: "2.2.0",
               to: "docs/2.2.0/",
             },
diff --git a/site2/website-next/versioned_docs/version-2.4.1/client-libraries.md b/site2/website-next/versioned_docs/version-2.4.1/client-libraries.md
new file mode 100644
index 0000000..23e5a06
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/client-libraries.md
@@ -0,0 +1,35 @@
+---
+id: client-libraries
+title: Pulsar client libraries
+sidebar_label: "Overview"
+original_id: client-libraries
+---
+
+Pulsar supports the following client libraries:
+
+- [Java client](client-libraries-java)
+- [Go client](client-libraries-go)
+- [Python client](client-libraries-python)
+- [C++ client](client-libraries-cpp)
+- [Node.js client](client-libraries-node)
+- [WebSocket client](client-libraries-websocket)
+- [C# client](client-libraries-dotnet)
+
+## Feature matrix
+Pulsar client feature matrix for different languages is listed on [Client Features Matrix](https://github.com/apache/pulsar/wiki/Client-Features-Matrix) page.
+
+## Third-party clients
+
+Besides the official released clients, multiple projects on developing Pulsar clients are available in different languages.
+
+> If you have developed a new Pulsar client, feel free to submit a pull request and add your client to the list below.
+
+| Language | Project | Maintainer | License | Description |
+|----------|---------|------------|---------|-------------|
+| Go | [pulsar-client-go](https://github.com/Comcast/pulsar-client-go) | [Comcast](https://github.com/Comcast) | [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) | A native golang client |
+| Go | [go-pulsar](https://github.com/t2y/go-pulsar) | [t2y](https://github.com/t2y) | [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) | 
+| Haskell | [supernova](https://github.com/cr-org/supernova) | [Chatroulette](https://github.com/cr-org) | [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) | Native Pulsar client for Haskell | 
+| Scala | [neutron](https://github.com/cr-org/neutron) | [Chatroulette](https://github.com/cr-org) | [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) | Purely functional Apache Pulsar client for Scala built on top of Fs2 |
+| Scala | [pulsar4s](https://github.com/sksamuel/pulsar4s) | [sksamuel](https://github.com/sksamuel) | [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) | Idomatic, typesafe, and reactive Scala client for Apache Pulsar |
+| Rust | [pulsar-rs](https://github.com/wyyerd/pulsar-rs) | [Wyyerd Group](https://github.com/wyyerd) | [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) | Future-based Rust bindings for Apache Pulsar |
+| .NET | [pulsar-client-dotnet](https://github.com/fsharplang-ru/pulsar-client-dotnet) | [Lanayx](https://github.com/Lanayx) | [![GitHub](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT) | Native .NET client for C#/F#/VB |
diff --git a/site2/website-next/versioned_docs/version-2.4.1/concepts-architecture-overview.md b/site2/website-next/versioned_docs/version-2.4.1/concepts-architecture-overview.md
new file mode 100644
index 0000000..6a501d2
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/concepts-architecture-overview.md
@@ -0,0 +1,172 @@
+---
+id: concepts-architecture-overview
+title: Architecture Overview
+sidebar_label: "Architecture"
+original_id: concepts-architecture-overview
+---
+
+At the highest level, a Pulsar instance is composed of one or more Pulsar clusters. Clusters within an instance can [replicate](concepts-replication) data amongst themselves.
+
+In a Pulsar cluster:
+
+* One or more brokers handles and load balances incoming messages from producers, dispatches messages to consumers, communicates with the Pulsar configuration store to handle various coordination tasks, stores messages in BookKeeper instances (aka bookies), relies on a cluster-specific ZooKeeper cluster for certain tasks, and more.
+* A BookKeeper cluster consisting of one or more bookies handles [persistent storage](#persistent-storage) of messages.
+* A ZooKeeper cluster specific to that cluster handles coordination tasks between Pulsar clusters.
+
+The diagram below provides an illustration of a Pulsar cluster:
+
+![Pulsar architecture diagram](/assets/pulsar-system-architecture.png)
+
+At the broader instance level, an instance-wide ZooKeeper cluster called the configuration store handles coordination tasks involving multiple clusters, for example [geo-replication](concepts-replication).
+
+## Brokers
+
+The Pulsar message broker is a stateless component that's primarily responsible for running two other components:
+
+* An HTTP server that exposes a {@inject: rest:REST:/} API for both administrative tasks and [topic lookup](concepts-clients.md#client-setup-phase) for producers and consumers. The producers connect to the brokers to publish messages and the consumers connect to the brokers to consume the messages.
+* A dispatcher, which is an asynchronous TCP server over a custom [binary protocol](developing-binary-protocol) used for all data transfers
+
+Messages are typically dispatched out of a [managed ledger](#managed-ledgers) cache for the sake of performance, *unless* the backlog exceeds the cache size. If the backlog grows too large for the cache, the broker will start reading entries from BookKeeper.
+
+Finally, to support geo-replication on global topics, the broker manages replicators that tail the entries published in the local region and republish them to the remote region using the Pulsar [Java client library](client-libraries-java).
+
+> For a guide to managing Pulsar brokers, see the [brokers](admin-api-brokers) guide.
+
+## Clusters
+
+A Pulsar instance consists of one or more Pulsar *clusters*. Clusters, in turn, consist of:
+
+* One or more Pulsar [brokers](#brokers)
+* A ZooKeeper quorum used for cluster-level configuration and coordination
+* An ensemble of bookies used for [persistent storage](#persistent-storage) of messages
+
+Clusters can replicate amongst themselves using [geo-replication](concepts-replication).
+
+> For a guide to managing Pulsar clusters, see the [clusters](admin-api-clusters) guide.
+
+## Metadata store
+
+The Pulsar metadata store maintains all the metadata of a Pulsar cluster, such as topic metadata, schema, broker load data, and so on. Pulsar uses [Apache ZooKeeper](https://zookeeper.apache.org/) for metadata storage, cluster configuration, and coordination. The Pulsar metadata store can be deployed on a separate ZooKeeper cluster or deployed on an existing ZooKeeper cluster. You can use one ZooKeeper cluster for both Pulsar metadata store and [BookKeeper metadata store](https://bookkee [...]
+
+In a Pulsar instance:
+
+* A configuration store quorum stores configuration for tenants, namespaces, and other entities that need to be globally consistent.
+* Each cluster has its own local ZooKeeper ensemble that stores cluster-specific configuration and coordination such as which brokers are responsible for which topics as well as ownership metadata, broker load reports, BookKeeper ledger metadata, and more.
+
+## Configuration store
+
+The configuration store maintains all the configurations of a Pulsar instance, such as clusters, tenants, namespaces, partitioned topic related configurations, and so on. A Pulsar instance can have a single local cluster, multiple local clusters, or multiple cross-region clusters. Consequently, the configuration store can share the configurations across multiple clusters under a Pulsar instance. The configuration store can be deployed on a separate ZooKeeper cluster or deployed on an exi [...]
+
+## Persistent storage
+
+Pulsar provides guaranteed message delivery for applications. If a message successfully reaches a Pulsar broker, it will be delivered to its intended target.
+
+This guarantee requires that non-acknowledged messages are stored in a durable manner until they can be delivered to and acknowledged by consumers. This mode of messaging is commonly called *persistent messaging*. In Pulsar, N copies of all messages are stored and synced on disk, for example 4 copies across two servers with mirrored [RAID](https://en.wikipedia.org/wiki/RAID) volumes on each server.
+
+### Apache BookKeeper
+
+Pulsar uses a system called [Apache BookKeeper](http://bookkeeper.apache.org/) for persistent message storage. BookKeeper is a distributed [write-ahead log](https://en.wikipedia.org/wiki/Write-ahead_logging) (WAL) system that provides a number of crucial advantages for Pulsar:
+
+* It enables Pulsar to utilize many independent logs, called [ledgers](#ledgers). Multiple ledgers can be created for topics over time.
+* It offers very efficient storage for sequential data that handles entry replication.
+* It guarantees read consistency of ledgers in the presence of various system failures.
+* It offers even distribution of I/O across bookies.
+* It's horizontally scalable in both capacity and throughput. Capacity can be immediately increased by adding more bookies to a cluster.
+* Bookies are designed to handle thousands of ledgers with concurrent reads and writes. By using multiple disk devices---one for journal and another for general storage--bookies are able to isolate the effects of read operations from the latency of ongoing write operations.
+
+In addition to message data, *cursors* are also persistently stored in BookKeeper. Cursors are [subscription](reference-terminology.md#subscription) positions for [consumers](reference-terminology.md#consumer). BookKeeper enables Pulsar to store consumer position in a scalable fashion.
+
+At the moment, Pulsar supports persistent message storage. This accounts for the `persistent` in all topic names. Here's an example:
+
+```http
+
+persistent://my-tenant/my-namespace/my-topic
+
+```
+
+> Pulsar also supports ephemeral ([non-persistent](concepts-messaging.md#non-persistent-topics)) message storage.
+
+
+You can see an illustration of how brokers and bookies interact in the diagram below:
+
+![Brokers and bookies](/assets/broker-bookie.png)
+
+
+### Ledgers
+
+A ledger is an append-only data structure with a single writer that is assigned to multiple BookKeeper storage nodes, or bookies. Ledger entries are replicated to multiple bookies. Ledgers themselves have very simple semantics:
+
+* A Pulsar broker can create a ledger, append entries to the ledger, and close the ledger.
+* After the ledger has been closed---either explicitly or because the writer process crashed---it can then be opened only in read-only mode.
+* Finally, when entries in the ledger are no longer needed, the whole ledger can be deleted from the system (across all bookies).
+
+#### Ledger read consistency
+
+The main strength of Bookkeeper is that it guarantees read consistency in ledgers in the presence of failures. Since the ledger can only be written to by a single process, that process is free to append entries very efficiently, without need to obtain consensus. After a failure, the ledger will go through a recovery process that will finalize the state of the ledger and establish which entry was last committed to the log. After that point, all readers of the ledger are guaranteed to see  [...]
+
+#### Managed ledgers
+
+Given that Bookkeeper ledgers provide a single log abstraction, a library was developed on top of the ledger called the *managed ledger* that represents the storage layer for a single topic. A managed ledger represents the abstraction of a stream of messages with a single writer that keeps appending at the end of the stream and multiple cursors that are consuming the stream, each with its own associated position.
+
+Internally, a single managed ledger uses multiple BookKeeper ledgers to store the data. There are two reasons to have multiple ledgers:
+
+1. After a failure, a ledger is no longer writable and a new one needs to be created.
+2. A ledger can be deleted when all cursors have consumed the messages it contains. This allows for periodic rollover of ledgers.
+
+### Journal storage
+
+In BookKeeper, *journal* files contain BookKeeper transaction logs. Before making an update to a [ledger](#ledgers), a bookie needs to ensure that a transaction describing the update is written to persistent (non-volatile) storage. A new journal file is created once the bookie starts or the older journal file reaches the journal file size threshold (configured using the [`journalMaxSizeMB`](reference-configuration.md#bookkeeper-journalMaxSizeMB) parameter).
+
+## Pulsar proxy
+
+One way for Pulsar clients to interact with a Pulsar [cluster](#clusters) is by connecting to Pulsar message [brokers](#brokers) directly. In some cases, however, this kind of direct connection is either infeasible or undesirable because the client doesn't have direct access to broker addresses. If you're running Pulsar in a cloud environment or on [Kubernetes](https://kubernetes.io) or an analogous platform, for example, then direct client connections to brokers are likely not possible.
+
+The **Pulsar proxy** provides a solution to this problem by acting as a single gateway for all of the brokers in a cluster. If you run the Pulsar proxy (which, again, is optional), all client connections with the Pulsar cluster will flow through the proxy rather than communicating with brokers.
+
+> For the sake of performance and fault tolerance, you can run as many instances of the Pulsar proxy as you'd like.
+
+Architecturally, the Pulsar proxy gets all the information it requires from ZooKeeper. When starting the proxy on a machine, you only need to provide ZooKeeper connection strings for the cluster-specific and instance-wide configuration store clusters. Here's an example:
+
+```bash
+
+$ bin/pulsar proxy \
+  --zookeeper-servers zk-0,zk-1,zk-2 \
+  --configuration-store-servers zk-0,zk-1,zk-2
+
+```
+
+> #### Pulsar proxy docs
+> For documentation on using the Pulsar proxy, see the [Pulsar proxy admin documentation](administration-proxy).
+
+
+Some important things to know about the Pulsar proxy:
+
+* Connecting clients don't need to provide *any* specific configuration to use the Pulsar proxy. You won't need to update the client configuration for existing applications beyond updating the IP used for the service URL (for example if you're running a load balancer over the Pulsar proxy).
+* [TLS encryption](security-tls-transport.md) and [authentication](security-tls-authentication) is supported by the Pulsar proxy
+
+## Service discovery
+
+[Clients](getting-started-clients) connecting to Pulsar brokers need to be able to communicate with an entire Pulsar instance using a single URL. Pulsar provides a built-in service discovery mechanism that you can set up using the instructions in the [Deploying a Pulsar instance](deploy-bare-metal.md#service-discovery-setup) guide.
+
+You can use your own service discovery system if you'd like. If you use your own system, there is just one requirement: when a client performs an HTTP request to an endpoint, such as `http://pulsar.us-west.example.com:8080`, the client needs to be redirected to *some* active broker in the desired cluster, whether via DNS, an HTTP or IP redirect, or some other means.
+
+The diagram below illustrates Pulsar service discovery:
+
+![alt-text](/assets/pulsar-service-discovery.png)
+
+In this diagram, the Pulsar cluster is addressable via a single DNS name: `pulsar-cluster.acme.com`. A [Python client](client-libraries-python), for example, could access this Pulsar cluster like this:
+
+```python
+
+from pulsar import Client
+
+client = Client('pulsar://pulsar-cluster.acme.com:6650')
+
+```
+
+:::note
+
+In Pulsar, each topic is handled by only one broker. Initial requests from a client to read, update or delete a topic are sent to a broker that may not be the topic owner. If the broker cannot handle the request for this topic, it redirects the request to the appropriate broker.
+
+:::
+
diff --git a/site2/website-next/versioned_docs/version-2.4.1/concepts-authentication.md b/site2/website-next/versioned_docs/version-2.4.1/concepts-authentication.md
new file mode 100644
index 0000000..b375ecb
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/concepts-authentication.md
@@ -0,0 +1,9 @@
+---
+id: concepts-authentication
+title: Authentication and Authorization
+sidebar_label: "Authentication and Authorization"
+original_id: concepts-authentication
+---
+
+Pulsar supports a pluggable [authentication](security-overview.md) mechanism which can be configured at the proxy and/or the broker. Pulsar also supports a pluggable [authorization](security-authorization) mechanism. These mechanisms work together to identify the client and its access rights on topics, namespaces and tenants.
+
diff --git a/site2/website-next/versioned_docs/version-2.4.1/concepts-clients.md b/site2/website-next/versioned_docs/version-2.4.1/concepts-clients.md
new file mode 100644
index 0000000..b68f76a
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/concepts-clients.md
@@ -0,0 +1,92 @@
+---
+id: concepts-clients
+title: Pulsar Clients
+sidebar_label: "Clients"
+original_id: concepts-clients
+---
+
+Pulsar exposes a client API with language bindings for [Java](client-libraries-java.md), [Go](client-libraries-go.md), [Python](client-libraries-python.md), [C++](client-libraries-cpp.md) and [C#](client-libraries-dotnet). The client API optimizes and encapsulates Pulsar's client-broker communication protocol and exposes a simple and intuitive API for use by applications.
+
+Under the hood, the current official Pulsar client libraries support transparent reconnection and/or connection failover to brokers, queuing of messages until acknowledged by the broker, and heuristics such as connection retries with backoff.
+
+> **Custom client libraries**
+> If you'd like to create your own client library, we recommend consulting the documentation on Pulsar's custom [binary protocol](developing-binary-protocol).
+
+
+## Client setup phase
+
+Before an application creates a producer/consumer, the Pulsar client library needs to initiate a setup phase including two steps:
+
+1. The client attempts to determine the owner of the topic by sending an HTTP lookup request to the broker. The request could reach one of the active brokers which, by looking at the (cached) zookeeper metadata knows who is serving the topic or, in case nobody is serving it, tries to assign it to the least loaded broker.
+1. Once the client library has the broker address, it creates a TCP connection (or reuse an existing connection from the pool) and authenticates it. Within this connection, client and broker exchange binary commands from a custom protocol. At this point the client sends a command to create producer/consumer to the broker, which will comply after having validated the authorization policy.
+
+Whenever the TCP connection breaks, the client immediately re-initiates this setup phase and keeps trying with exponential backoff to re-establish the producer or consumer until the operation succeeds.
+
+## Reader interface
+
+In Pulsar, the "standard" [consumer interface](concepts-messaging.md#consumers) involves using consumers to listen on [topics](reference-terminology.md#topic), process incoming messages, and finally acknowledge those messages when they are processed. Whenever a new subscription is created, it is initially positioned at the end of the topic (by default), and consumers associated with that subscription begin reading with the first message created afterwards.  Whenever a consumer connects t [...]
+
+The **reader interface** for Pulsar enables applications to manually manage cursors. When you use a reader to connect to a topic---rather than a consumer---you need to specify *which* message the reader begins reading from when it connects to a topic. When connecting to a topic, the reader interface enables you to begin with:
+
+* The **earliest** available message in the topic
+* The **latest** available message in the topic
+* Some other message between the earliest and the latest. If you select this option, you'll need to explicitly provide a message ID. Your application will be responsible for "knowing" this message ID in advance, perhaps fetching it from a persistent data store or cache.
+
+The reader interface is helpful for use cases like using Pulsar to provide effectively-once processing semantics for a stream processing system. For this use case, it's essential that the stream processing system be able to "rewind" topics to a specific message and begin reading there. The reader interface provides Pulsar clients with the low-level abstraction necessary to "manually position" themselves within a topic.
+
+Internally, the reader interface is implemented as a consumer using an exclusive, non-durable subscription to the topic with a randomly-allocated name.
+
+[ **IMPORTANT** ]
+
+Unlike subscription/consumer, readers are non-durable in nature and does not prevent data in a topic from being deleted, thus it is ***strongly*** advised that [data retention](cookbooks-retention-expiry) be configured. If data retention for a topic is not configured for an adequate amount of time, messages that the reader has not yet read might be deleted .  This causes the readers to essentially skip messages. Configuring the data retention for a topic guarantees the reader with a cert [...]
+
+Please also note that a reader can have a "backlog", but the metric is only used for users to know how behind the reader is. The metric is not considered for any backlog quota calculations. 
+
+![The Pulsar consumer and reader interfaces](/assets/pulsar-reader-consumer-interfaces.png)
+
+Here's a Java example that begins reading from the earliest available message on a topic:
+
+```java
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Reader;
+
+// Create a reader on a topic and for a specific message (and onward)
+Reader<byte[]> reader = pulsarClient.newReader()
+    .topic("reader-api-test")
+    .startMessageId(MessageId.earliest)
+    .create();
+
+while (true) {
+    Message message = reader.readNext();
+
+    // Process the message
+}
+
+```
+
+To create a reader that reads from the latest available message:
+
+```java
+
+Reader<byte[]> reader = pulsarClient.newReader()
+    .topic(topic)
+    .startMessageId(MessageId.latest)
+    .create();
+
+```
+
+To create a reader that reads from some message between the earliest and the latest:
+
+```java
+
+byte[] msgIdBytes = // Some byte array
+MessageId id = MessageId.fromByteArray(msgIdBytes);
+Reader<byte[]> reader = pulsarClient.newReader()
+    .topic(topic)
+    .startMessageId(id)
+    .create();
+
+```
+
diff --git a/site2/website-next/versioned_docs/version-2.4.1/concepts-messaging.md b/site2/website-next/versioned_docs/version-2.4.1/concepts-messaging.md
new file mode 100644
index 0000000..89e13d1
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/concepts-messaging.md
@@ -0,0 +1,438 @@
+---
+id: concepts-messaging
+title: Messaging Concepts
+sidebar_label: "Messaging"
+original_id: concepts-messaging
+---
+
+Pulsar is built on the [publish-subscribe](https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern) pattern, aka pub-sub. In this pattern, [producers](#producers) publish messages to [topics](#topics). [Consumers](#consumers) can then [subscribe](#subscription-types) to those topics, process incoming messages, and send an acknowledgement when processing is complete.
+
+Once a subscription has been created, all messages will be [retained](concepts-architecture-overview.md#persistent-storage) by Pulsar, even if the consumer gets disconnected. Retained messages will be discarded only when a consumer acknowledges that they've been successfully processed.
+
+## Messages
+
+Messages are the basic "unit" of Pulsar. They're what producers publish to topics and what consumers then consume from topics (and acknowledge when the message has been processed). Messages are the analogue of letters in a postal service system.
+
+Component | Purpose
+:---------|:-------
+Value / data payload | The data carried by the message. All Pulsar messages carry raw bytes, although message data can also conform to data [schemas](schema-get-started)
+Key | Messages can optionally be tagged with keys, which can be useful for things like [topic compaction](concepts-topic-compaction)
+Properties | An optional key/value map of user-defined properties
+Producer name | The name of the producer that produced the message (producers are automatically given default names, but you can apply your own explicitly as well)
+Sequence ID | Each Pulsar message belongs to an ordered sequence on its topic. A message's sequence ID is its ordering in that sequence.
+Publish time | The timestamp of when the message was published (automatically applied by the producer)
+Event time | An optional timestamp that applications can attach to the message representing when something happened, e.g. when the message was processed. The event time of a message is 0 if none is explicitly set.
+
+
+> For a more in-depth breakdown of Pulsar message contents, see the documentation on Pulsar's [binary protocol](developing-binary-protocol).
+
+## Producers
+
+A producer is a process that attaches to a topic and publishes messages to a Pulsar [broker](reference-terminology.md#broker) for processing.
+
+### Send modes
+
+Producers can send messages to brokers either synchronously (sync) or asynchronously (async).
+
+| Mode       | Description                                                                                                                                                                                                                                                                                                                                                              |
+|:-----------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Sync send  | The producer will wait for acknowledgement from the broker after sending each message. If acknowledgment isn't received then the producer will consider the send operation a failure.                                                                                                                                                                                    |
+| Async send | The producer will put the message in a blocking queue and return immediately. The client library will then send the message to the broker in the background. If the queue is full (max size [configurable](reference-configuration.md#broker), the producer could be blocked or fail immediately when calling the API, depending on arguments passed to the producer. |
+
+### Compression
+
+Messages published by producers can be compressed during transportation in order to save bandwidth. Pulsar currently supports the following types of compression:
+
+* [LZ4](https://github.com/lz4/lz4)
+* [ZLIB](https://zlib.net/)
+* [ZSTD](https://facebook.github.io/zstd/)
+* [SNAPPY](https://google.github.io/snappy/)
+
+### Batching
+
+If batching is enabled, the producer will accumulate and send a batch of messages in a single request. Batching size is defined by the maximum number of messages and maximum publish latency.
+
+## Consumers
+
+A consumer is a process that attaches to a topic via a subscription and then receives messages.
+
+### Receive modes
+
+Messages can be received from [brokers](reference-terminology.md#broker) either synchronously (sync) or asynchronously (async).
+
+| Mode          | Description                                                                                                                                                                                                   |
+|:--------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| Sync receive  | A sync receive will be blocked until a message is available.                                                                                                                                                  |
+| Async receive | An async receive will return immediately with a future value---a [`CompletableFuture`](http://www.baeldung.com/java-completablefuture) in Java, for example---that completes once a new message is available. |
+
+### Listeners
+
+Client libraries provide listener implementation for consumers. For example, the [Java client](client-libraries-java) provides a {@inject: javadoc:MesssageListener:/client/org/apache/pulsar/client/api/MessageListener} interface. In this interface, the `received` method is called whenever a new message is received.
+
+### Acknowledgement
+
+When a consumer has consumed a message successfully, the consumer sends an acknowledgement request to the broker, so that the broker will discard the message. Otherwise, it [stores](concepts-architecture-overview.md#persistent-storage) the message.
+
+Messages can be acknowledged either one by one or cumulatively. With cumulative acknowledgement, the consumer only needs to acknowledge the last message it received. All messages in the stream up to (and including) the provided message will not be re-delivered to that consumer.
+
+
+> Cumulative acknowledgement cannot be used with [shared subscription type](#subscription-types), because shared mode involves multiple consumers having access to the same subscription.
+
+In Shared subscription type, messages can be acknowledged individually.
+
+### Negative acknowledgement
+
+When a consumer does not consume a message successfully at a time, and wants to consume the message again, the consumer can send a negative acknowledgement to the broker, and then the broker will redeliver the message.
+
+Messages can be negatively acknowledged either individually or cumulatively, depending on the consumption subscription type.
+
+In the exclusive and failover subscription types, consumers only negatively acknowledge the last message they have received.
+
+In the shared and Key_Shared subscription types, you can negatively acknowledge messages individually.
+
+### Acknowledgement timeout
+
+When a message is not consumed successfully, and you want to trigger the broker to redeliver the message automatically, you can adopt the unacknowledged message automatic re-delivery mechanism. Client will track the unacknowledged messages within the entire `acktimeout` time range, and send a `redeliver unacknowledged messages` request to the broker automatically when the acknowledgement timeout is specified.
+
+:::note
+
+Use negative acknowledgement prior to acknowledgement timeout. Negative acknowledgement controls re-delivery of individual messages with more precise, and avoids invalid redeliveries when the message processing time exceeds the acknowledgement timeout.
+
+:::
+
+### Dead letter topic
+
+Dead letter topic enables you to consume new messages when some messages cannot be consumed successfully by a consumer. In this mechanism, messages that are failed to be consumed are stored in a separate topic, which is called dead letter topic. You can decide how to handle messages in the dead letter topic.
+
+The following example shows how to enable dead letter topic in a Java client using the default dead letter topic:
+
+```java
+
+Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+              .topic(topic)
+              .subscriptionName("my-subscription")
+              .subscriptionType(SubscriptionType.Shared)
+              .deadLetterPolicy(DeadLetterPolicy.builder()
+                    .maxRedeliverCount(maxRedeliveryCount)
+                    .build())
+              .subscribe();
+
+```
+
+The default dead letter topic uses this format: 
+
+```
+
+<topicname>-<subscriptionname>-DLQ
+
+```
+
+  
+If you want to specify the name of the dead letter topic, use this Java client example:
+
+```java
+
+Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
+              .topic(topic)
+              .subscriptionName("my-subscription")
+              .subscriptionType(SubscriptionType.Shared)
+              .deadLetterPolicy(DeadLetterPolicy.builder()
+                    .maxRedeliverCount(maxRedeliveryCount)
+                    .deadLetterTopic("your-topic-name")
+                    .build())
+              .subscribe();
+
+```
+
+Dead letter topic depends on message re-delivery. Messages are redelivered either due to [acknowledgement timeout](#acknowledgement-timeout) or [negative acknowledgement](#negative-acknowledgement). If you are going to use negative acknowledgement on a message, make sure it is negatively acknowledged before the acknowledgement timeout. 
+
+:::note
+
+Currently, dead letter topic is enabled only in Shared subscription type.
+
+:::
+
+## Topics
+
+As in other pub-sub systems, topics in Pulsar are named channels for transmitting messages from [producers](reference-terminology.md#producer) to [consumers](reference-terminology.md#consumer). Topic names are URLs that have a well-defined structure:
+
+```http
+
+{persistent|non-persistent}://tenant/namespace/topic
+
+```
+
+Topic name component | Description
+:--------------------|:-----------
+`persistent` / `non-persistent` | This identifies the type of topic. Pulsar supports two kind of topics: [persistent](concepts-architecture-overview.md#persistent-storage) and [non-persistent](#non-persistent-topics) (persistent is the default, so if you don't specify a type the topic will be persistent). With persistent topics, all messages are durably [persisted](concepts-architecture-overview.md#persistent-storage) on disk (that means on multiple disks unless the broker is standalone) [...]
+`tenant`             | The topic's tenant within the instance. Tenants are essential to multi-tenancy in Pulsar and can be spread across clusters.
+`namespace`          | The administrative unit of the topic, which acts as a grouping mechanism for related topics. Most topic configuration is performed at the [namespace](#namespaces) level. Each tenant can have multiple namespaces.
+`topic`              | The final part of the name. Topic names are freeform and have no special meaning in a Pulsar instance.
+
+
+> #### No need to explicitly create new topics
+> You don't need to explicitly create topics in Pulsar. If a client attempts to write or receive messages to/from a topic that does not yet exist, Pulsar will automatically create that topic under the [namespace](#namespaces) provided in the [topic name](#topics).
+> If no tenant or namespace is specified when a client creates a topic, the topic is created in the default tenant and namespace. You can also create a topic in a specified tenant and namespace, such as `persistent://my-tenant/my-namespace/my-topic`. `persistent://my-tenant/my-namespace/my-topic` means the `my-topic` topic is created in the `my-namespace` namespace of the `my-tenant` tenant.
+
+
+## Namespaces
+
+A namespace is a logical nomenclature within a tenant. A tenant can create multiple namespaces via the [admin API](admin-api-namespaces.md#create). For instance, a tenant with different applications can create a separate namespace for each application. A namespace allows the application to create and manage a hierarchy of topics. The topic `my-tenant/app1` is a namespace for the application `app1` for `my-tenant`. You can create any number of [topics](#topics) under the namespace.
+
+## Subscription types
+
+A subscription is a named configuration rule that determines how messages are delivered to consumers. There are three available subscription types in Pulsar: [exclusive](#exclusive), [shared](#shared), and [failover](#failover). These types are illustrated in the figure below.
+
+![Subscription types](/assets/pulsar-subscription-types.png)
+
+### Exclusive
+
+In *exclusive* type, only a single consumer is allowed to attach to the subscription. If more than one consumer attempts to subscribe to a topic using the same subscription, the consumer receives an error.
+
+In the diagram below, only **Consumer A-0** is allowed to consume messages.
+
+> Exclusive is the default subscription type.
+
+![Exclusive subscriptions](/assets/pulsar-exclusive-subscriptions.png)
+
+### Failover
+
+In *Failover* type, multiple consumers can attach to the same subscription. In failover mode, the broker selects the master consumer based on the priority level and the lexicographical sorting of a consumer name. If two consumers have an identical priority level, the broker selects the master consumer based on the lexicographical sorting. If these two consumers have different priority levels, the broker selects the consumer with a higher priority level as the master consumer. The master  [...]
+
+For partitioned topics, the broker assigns partitioned topics to the consumer with the highest priority level. If multiple consumers have the highest priority level, the broker evenly assigns topics to consumers with these consumers.
+
+In the diagram below, **Consumer-B-0** is the master consumer while **Consumer-B-1** would be the next consumer in line to receive messages if **Consumer-B-0** is disconnected.
+
+![Failover subscriptions](/assets/pulsar-failover-subscriptions.png)
+
+### Shared
+
+In *shared* or *round robin* mode, multiple consumers can attach to the same subscription. Messages are delivered in a round robin distribution across consumers, and any given message is delivered to only one consumer. When a consumer disconnects, all the messages that were sent to it and not acknowledged will be rescheduled for sending to the remaining consumers.
+
+In the diagram below, **Consumer-C-1** and **Consumer-C-2** are able to subscribe to the topic, but **Consumer-C-3** and others could as well.
+
+> #### Limitations of shared mode
+> There are two important things to be aware of when using shared mode:
+> * Message ordering is not guaranteed.
+> * You cannot use cumulative acknowledgment with shared mode.
+
+![Shared subscriptions](/assets/pulsar-shared-subscriptions.png)
+
+### Key_Shared
+
+In *Key_Shared* mode, multiple consumers can attach to the same subscription. Messages are delivered in a distribution across consumers and message with same key or same ordering key are delivered to only one consumer. No matter how many times the message is re-delivered, it is delivered to the same consumer. When a consumer connected or disconnected will cause served consumer change for some key of message.
+
+> #### Limitations of Key_Shared mode
+> There are two important things to be aware of when using Key_Shared mode:
+> * You need to specify a key or orderingKey for messages
+> * You cannot use cumulative acknowledgment with Key_Shared mode.
+
+![Key_Shared subscriptions](/assets/pulsar-key-shared-subscriptions.png)
+
+**Key_Shared subscription is a beta feature. You can disable it at broker.config.**
+
+## Multi-topic subscriptions
+
+When a consumer subscribes to a Pulsar topic, by default it subscribes to one specific topic, such as `persistent://public/default/my-topic`. As of Pulsar version 1.23.0-incubating, however, Pulsar consumers can simultaneously subscribe to multiple topics. You can define a list of topics in two ways:
+
+* On the basis of a [**reg**ular **ex**pression](https://en.wikipedia.org/wiki/Regular_expression) (regex), for example `persistent://public/default/finance-.*`
+* By explicitly defining a list of topics
+
+> When subscribing to multiple topics by regex, all topics must be in the same [namespace](#namespaces)
+
+When subscribing to multiple topics, the Pulsar client will automatically make a call to the Pulsar API to discover the topics that match the regex pattern/list and then subscribe to all of them. If any of the topics don't currently exist, the consumer will auto-subscribe to them once the topics are created.
+
+> #### No ordering guarantees across multiple topics
+> When a producer sends messages to a single topic, all messages are guaranteed to be read from that topic in the same order. However, these guarantees do not hold across multiple topics. So when a producer sends message to multiple topics, the order in which messages are read from those topics is not guaranteed to be the same.
+
+Here are some multi-topic subscription examples for Java:
+
+```java
+
+import java.util.regex.Pattern;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.PulsarClient;
+
+PulsarClient pulsarClient = // Instantiate Pulsar client object
+
+// Subscribe to all topics in a namespace
+Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
+Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
+                .topicsPattern(allTopicsInNamespace)
+                .subscriptionName("subscription-1")
+                .subscribe();
+
+// Subscribe to a subsets of topics in a namespace, based on regex
+Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
+Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer()
+                .topicsPattern(someTopicsInNamespace)
+                .subscriptionName("subscription-1")
+                .subscribe();
+
+```
+
+For code examples, see:
+
+* [Java](client-libraries-java.md#multi-topic-subscriptions)
+
+## Partitioned topics
+
+Normal topics can be served only by a single broker, which limits the topic's maximum throughput. *Partitioned topics* are a special type of topic that be handled by multiple brokers, which allows for much higher throughput.
+
+Behind the scenes, a partitioned topic is actually implemented as N internal topics, where N is the number of partitions. When publishing messages to a partitioned topic, each message is routed to one of several brokers. The distribution of partitions across brokers is handled automatically by Pulsar.
+
+The diagram below illustrates this:
+
+![](/assets/partitioning.png)
+
+Here, the topic **Topic1** has five partitions (**P0** through **P4**) split across three brokers. Because there are more partitions than brokers, two brokers handle two partitions a piece, while the third handles only one (again, Pulsar handles this distribution of partitions automatically).
+
+Messages for this topic are broadcast to two consumers. The [routing mode](#routing-modes) determines both which broker handles each partition, while the [subscription type](#subscription-types) determines which messages go to which consumers.
+
+Decisions about routing and subscription types can be made separately in most cases. In general, throughput concerns should guide partitioning/routing decisions while subscription decisions should be guided by application semantics.
+
+There is no difference between partitioned topics and normal topics in terms of how subscription types work, as partitioning only determines what happens between when a message is published by a producer and processed and acknowledged by a consumer.
+
+Partitioned topics need to be explicitly created via the [admin API](admin-api-overview). The number of partitions can be specified when creating the topic.
+
+### Routing modes
+
+When publishing to partitioned topics, you must specify a *routing mode*. The routing mode determines which partition---that is, which internal topic---each message should be published to.
+
+There are three {@inject: javadoc:MessageRoutingMode:/client/org/apache/pulsar/client/api/MessageRoutingMode} available:
+
+Mode     | Description 
+:--------|:------------
+`RoundRobinPartition` | If no key is provided, the producer will publish messages across all partitions in round-robin fashion to achieve maximum throughput. Please note that round-robin is not done per individual message but rather it's set to the same boundary of batching delay, to ensure batching is effective. While if a key is specified on the message, the partitioned producer will hash the key and assign message to a particular partition. This is the default mode. 
+`SinglePartition`     | If no key is provided, the producer will randomly pick one single partition and publish all the messages into that partition. While if a key is specified on the message, the partitioned producer will hash the key and assign message to a particular partition.
+`CustomPartition`     | Use custom message router implementation that will be called to determine the partition for a particular message. User can create a custom routing mode by using the [Java client](client-libraries-java) and implementing the {@inject: javadoc:MessageRouter:/client/org/apache/pulsar/client/api/MessageRouter} interface.
+
+### Ordering guarantee
+
+The ordering of messages is related to MessageRoutingMode and Message Key. Usually, user would want an ordering of Per-key-partition guarantee.
+
+If there is a key attached to message, the messages will be routed to corresponding partitions based on the hashing scheme specified by {@inject: javadoc:HashingScheme:/client/org/apache/pulsar/client/api/HashingScheme} in {@inject: javadoc:ProducerBuilder:/client/org/apache/pulsar/client/api/ProducerBuilder}, when using either `SinglePartition` or `RoundRobinPartition` mode.
+
+Ordering guarantee | Description | Routing Mode and Key
+:------------------|:------------|:------------
+Per-key-partition  | All the messages with the same key will be in order and be placed in same partition. | Use either `SinglePartition` or `RoundRobinPartition` mode, and Key is provided by each message.
+Per-producer       | All the messages from the same producer will be in order. | Use `SinglePartition` mode, and no Key is provided for each message.
+
+### Hashing scheme
+
+{@inject: javadoc:HashingScheme:/client/org/apache/pulsar/client/api/HashingScheme} is an enum that represent sets of standard hashing functions available when choosing the partition to use for a particular message.
+
+There are 2 types of standard hashing functions available: `JavaStringHash` and `Murmur3_32Hash`. 
+The default hashing function for producer is `JavaStringHash`.
+Please pay attention that `JavaStringHash` is not useful when producers can be from different multiple language clients, under this use case, it is recommended to use `Murmur3_32Hash`.
+
+
+
+## Non-persistent topics
+
+
+By default, Pulsar persistently stores *all* unacknowledged messages on multiple [BookKeeper](concepts-architecture-overview.md#persistent-storage) bookies (storage nodes). Data for messages on persistent topics can thus survive broker restarts and subscriber failover.
+
+Pulsar also, however, supports **non-persistent topics**, which are topics on which messages are *never* persisted to disk and live only in memory. When using non-persistent delivery, killing a Pulsar broker or disconnecting a subscriber to a topic means that all in-transit messages are lost on that (non-persistent) topic, meaning that clients may see message loss.
+
+Non-persistent topics have names of this form (note the `non-persistent` in the name):
+
+```http
+
+non-persistent://tenant/namespace/topic
+
+```
+
+> For more info on using non-persistent topics, see the [Non-persistent messaging cookbook](cookbooks-non-persistent).
+
+In non-persistent topics, brokers immediately deliver messages to all connected subscribers *without persisting them* in [BookKeeper](concepts-architecture-overview.md#persistent-storage). If a subscriber is disconnected, the broker will not be able to deliver those in-transit messages, and subscribers will never be able to receive those messages again. Eliminating the persistent storage step makes messaging on non-persistent topics slightly faster than on persistent topics in some cases [...]
+
+> With non-persistent topics, message data lives only in memory. If a message broker fails or message data can otherwise not be retrieved from memory, your message data may be lost. Use non-persistent topics only if you're *certain* that your use case requires it and can sustain it.
+
+By default, non-persistent topics are enabled on Pulsar brokers. You can disable them in the broker's [configuration](reference-configuration.md#broker-enableNonPersistentTopics). You can manage non-persistent topics using the [`pulsar-admin topics`](referencereference--pulsar-admin/#topics-1) interface.
+
+### Performance
+
+Non-persistent messaging is usually faster than persistent messaging because brokers don't persist messages and immediately send acks back to the producer as soon as that message is delivered to connected brokers. Producers thus see comparatively low publish latency with non-persistent topic.
+
+### Client API
+
+Producers and consumers can connect to non-persistent topics in the same way as persistent topics, with the crucial difference that the topic name must start with `non-persistent`. All three subscription types---[exclusive](#exclusive), [shared](#shared), and [failover](#failover)---are supported for non-persistent topics.
+
+Here's an example [Java consumer](client-libraries-java.md#consumers) for a non-persistent topic:
+
+```java
+
+PulsarClient client = PulsarClient.builder()
+        .serviceUrl("pulsar://localhost:6650")
+        .build();
+String npTopic = "non-persistent://public/default/my-topic";
+String subscriptionName = "my-subscription-name";
+
+Consumer<byte[]> consumer = client.newConsumer()
+        .topic(npTopic)
+        .subscriptionName(subscriptionName)
+        .subscribe();
+
+```
+
+Here's an example [Java producer](client-libraries-java.md#producer) for the same non-persistent topic:
+
+```java
+
+Producer<byte[]> producer = client.newProducer()
+                .topic(npTopic)
+                .create();
+
+```
+
+## Message retention and expiry
+
+By default, Pulsar message brokers:
+
+* immediately delete *all* messages that have been acknowledged by a consumer, and
+* [persistently store](concepts-architecture-overview.md#persistent-storage) all unacknowledged messages in a message backlog.
+
+Pulsar has two features, however, that enable you to override this default behavior:
+
+* Message **retention** enables you to store messages that have been acknowledged by a consumer
+* Message **expiry** enables you to set a time to live (TTL) for messages that have not yet been acknowledged
+
+> All message retention and expiry is managed at the [namespace](#namespaces) level. For a how-to, see the [Message retention and expiry](cookbooks-retention-expiry) cookbook.
+
+The diagram below illustrates both concepts:
+
+![Message retention and expiry](/assets/retention-expiry.png)
+
+With message retention, shown at the top, a <span style={{color: " #89b557"}}>retention policy</span> applied to all topics in a namespace dictates that some messages are durably stored in Pulsar even though they've already been acknowledged. Acknowledged messages that are not covered by the retention policy are <span style={{color: " #bb3b3e"}}>deleted</span>. Without a retention policy, *all* of the <span style={{color: " #19967d"}}>acknowledged messages</span> would be deleted.
+
+With message expiry, shown at the bottom, some messages are <span style={{color: " #bb3b3e"}}>deleted</span>, even though they <span style={{color: " #337db6"}}>haven't been acknowledged</span>, because they've expired according to the <span style={{color: " #e39441"}}>TTL applied to the namespace</span> (for example because a TTL of 5 minutes has been applied and the messages haven't been acknowledged but are 10 minutes old).
+
+## Message deduplication
+
+Message **duplication** occurs when a message is [persisted](concepts-architecture-overview.md#persistent-storage) by Pulsar more than once. Message ***de**duplication** is an optional Pulsar feature that prevents unnecessary message duplication by processing each message only once, **even if the message is received more than once*.
+
+The following diagram illustrates what happens when message deduplication is disabled vs. enabled:
+
+![Pulsar message deduplication](/assets/message-deduplication.png)
+
+
+Message deduplication is disabled in the scenario shown at the top. Here, a producer publishes message 1 on a topic; the message reaches a Pulsar broker and is [persisted](concepts-architecture-overview.md#persistent-storage) to BookKeeper. The producer then sends message 1 again (in this case due to some retry logic), and the message is received by the broker and stored in BookKeeper again, which means that duplication has occurred.
+
+In the second scenario at the bottom, the producer publishes message 1, which is received by the broker and persisted, as in the first scenario. When the producer attempts to publish the message again, however, the broker knows that it has already seen message 1 and thus does not persist the message.
+
+> Message deduplication is handled at the namespace level. For more instructions, see the [message deduplication cookbook](cookbooks-deduplication).
+
+
+### Producer idempotency
+
+The other available approach to message deduplication is to ensure that each message is *only produced once*. This approach is typically called **producer idempotency**. The drawback of this approach is that it defers the work of message deduplication to the application. In Pulsar, this is handled at the [broker](reference-terminology.md#broker) level, which means that you don't need to modify your Pulsar client code. Instead, you only need to make administrative changes (see the [Managi [...]
+
+### Deduplication and effectively-once semantics
+
+Message deduplication makes Pulsar an ideal messaging system to be used in conjunction with stream processing engines (SPEs) and other systems seeking to provide [effectively-once](https://streaml.io/blog/exactly-once) processing semantics. Messaging systems that don't offer automatic message deduplication require the SPE or other system to guarantee deduplication, which means that strict message ordering comes at the cost of burdening the application with the responsibility of deduplica [...]
+
+> More in-depth information can be found in [this post](https://streaml.io/blog/pulsar-effectively-once/) on the [Streamlio blog](https://streaml.io/blog)
+
+
diff --git a/site2/website-next/versioned_docs/version-2.4.1/concepts-multi-tenancy.md b/site2/website-next/versioned_docs/version-2.4.1/concepts-multi-tenancy.md
new file mode 100644
index 0000000..be752cc
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/concepts-multi-tenancy.md
@@ -0,0 +1,59 @@
+---
+id: concepts-multi-tenancy
+title: Multi Tenancy
+sidebar_label: "Multi Tenancy"
+original_id: concepts-multi-tenancy
+---
+
+Pulsar was created from the ground up as a multi-tenant system. To support multi-tenancy, Pulsar has a concept of tenants. Tenants can be spread across clusters and can each have their own [authentication and authorization](security-overview) scheme applied to them. They are also the administrative unit at which storage quotas, [message TTL](cookbooks-retention-expiry.md#time-to-live-ttl), and isolation policies can be managed.
+
+The multi-tenant nature of Pulsar is reflected mostly visibly in topic URLs, which have this structure:
+
+```http
+
+persistent://tenant/namespace/topic
+
+```
+
+As you can see, the tenant is the most basic unit of categorization for topics (more fundamental than the namespace and topic name).
+
+## Tenants
+
+To each tenant in a Pulsar instance you can assign:
+
+* An [authorization](security-authorization) scheme
+* The set of [clusters](reference-terminology.md#cluster) to which the tenant's configuration applies
+
+## Namespaces
+
+Tenants and namespaces are two key concepts of Pulsar to support multi-tenancy.
+
+* Pulsar is provisioned for specified tenants with appropriate capacity allocated to the tenant.
+* A namespace is the administrative unit nomenclature within a tenant. The configuration policies set on a namespace apply to all the topics created in that namespace. A tenant may create multiple namespaces via self-administration using the REST API and the [`pulsar-admin`](reference-pulsar-admin) CLI tool. For instance, a tenant with different applications can create a separate namespace for each application.
+
+Names for topics in the same namespace will look like this:
+
+```http
+
+persistent://tenant/app1/topic-1
+
+persistent://tenant/app1/topic-2
+
+persistent://tenant/app1/topic-3
+
+```
+
+### Namespace change events and topic-level policies
+
+Pulsar is a multi-tenant event streaming system. Administrators can manage the tenants and namespaces by setting policies at different levels. However, the policies, such as retention policy and storage quota policy, are only available at a namespace level. In many use cases, users need to set a policy at the topic level. The namespace change events approach is proposed for supporting topic-level policies in an efficient way. In this approach, Pulsar is used as an event log to store name [...]
+
+- Avoid using ZooKeeper and introduce more loads to ZooKeeper.
+- Use Pulsar as an event log for propagating the policy cache. It can scale efficiently.
+- Use Pulsar SQL to query the namespace changes and audit the system.
+
+Each namespace has a system topic `__change_events`. This system topic is used for storing change events for a given namespace. The following figure illustrates how to use namespace change events to implement a topic-level policy.
+
+1. Pulsar Admin clients communicate with the Admin Restful API to update topic level policies.
+2. Any broker that receives the Admin HTTP request publishes a topic policy change event to the corresponding `__change_events` topic of the namespace.
+3. Each broker that owns a namespace bundle(s) subscribes to the `__change_events` topic to receive change events of the namespace. It then applies the change events to the policy cache.
+4. Once the policy cache is updated, the broker sends the response back to the Pulsar Admin clients.
diff --git a/site2/website-next/versioned_docs/version-2.4.1/concepts-overview.md b/site2/website-next/versioned_docs/version-2.4.1/concepts-overview.md
new file mode 100644
index 0000000..b903fa4
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/concepts-overview.md
@@ -0,0 +1,31 @@
+---
+id: concepts-overview
+title: Pulsar Overview
+sidebar_label: "Overview"
+original_id: concepts-overview
+---
+
+Pulsar is a multi-tenant, high-performance solution for server-to-server messaging. Pulsar was originally developed by Yahoo, it is under the stewardship of the [Apache Software Foundation](https://www.apache.org/).
+
+Key features of Pulsar are listed below:
+
+* Native support for multiple clusters in a Pulsar instance, with seamless [geo-replication](administration-geo) of messages across clusters.
+* Very low publish and end-to-end latency.
+* Seamless scalability to over a million topics.
+* A simple [client API](concepts-clients.md) with bindings for [Java](client-libraries-java.md), [Go](client-libraries-go.md), [Python](client-libraries-python.md) and [C++](client-libraries-cpp).
+* Multiple [subscription types](concepts-messaging.md#subscription-types) ([exclusive](concepts-messaging.md#exclusive), [shared](concepts-messaging.md#shared), and [failover](concepts-messaging.md#failover)) for topics.
+* Guaranteed message delivery with [persistent message storage](concepts-architecture-overview.md#persistent-storage) provided by [Apache BookKeeper](http://bookkeeper.apache.org/).
+* A serverless light-weight computing framework [Pulsar Functions](functions-overview) offers the capability for stream-native data processing.
+* A serverless connector framework [Pulsar IO](io-overview), which is built on Pulsar Functions, makes it easier to move data in and out of Apache Pulsar.
+* [Tiered Storage](concepts-tiered-storage) offloads data from hot/warm storage to cold/longterm storage (such as S3 and GCS) when the data is aging out.
+
+## Contents
+
+- [Messaging Concepts](concepts-messaging)
+- [Architecture Overview](concepts-architecture-overview)
+- [Pulsar Clients](concepts-clients)
+- [Geo Replication](concepts-replication)
+- [Multi Tenancy](concepts-multi-tenancy)
+- [Authentication and Authorization](concepts-authentication)
+- [Topic Compaction](concepts-topic-compaction)
+- [Tiered Storage](concepts-tiered-storage)
diff --git a/site2/website-next/versioned_docs/version-2.4.1/concepts-replication.md b/site2/website-next/versioned_docs/version-2.4.1/concepts-replication.md
new file mode 100644
index 0000000..6e23962
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/concepts-replication.md
@@ -0,0 +1,9 @@
+---
+id: concepts-replication
+title: Geo Replication
+sidebar_label: "Geo Replication"
+original_id: concepts-replication
+---
+
+Pulsar enables messages to be produced and consumed in different geo-locations. For instance, your application may be publishing data in one region or market and you would like to process it for consumption in other regions or markets. [Geo-replication](administration-geo) in Pulsar enables you to do that.
+
diff --git a/site2/website-next/versioned_docs/version-2.4.1/concepts-schema-registry.md b/site2/website-next/versioned_docs/version-2.4.1/concepts-schema-registry.md
new file mode 100644
index 0000000..72cb466
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/concepts-schema-registry.md
@@ -0,0 +1,126 @@
+---
+id: concepts-schema-registry
+title: Schema Registry
+sidebar_label: "Schema Registry"
+original_id: concepts-schema-registry
+---
+
+Type safety is extremely important in any application built around a message bus like Pulsar. Producers and consumers need some kind of mechanism for coordinating types at the topic level lest a wide variety of potential problems arise (for example serialization and deserialization issues). Applications typically adopt one of two basic approaches to type safety in messaging:
+
+1. A "client-side" approach in which message producers and consumers are responsible for not only serializing and deserializing messages (which consist of raw bytes) but also "knowing" which types are being transmitted via which topics. If a producer is sending temperature sensor data on the topic `topic-1`, consumers of that topic will run into trouble if they attempt to parse that data as, say, moisture sensor readings.
+2. A "server-side" approach in which producers and consumers inform the system which data types can be transmitted via the topic. With this approach, the messaging system enforces type safety and ensures that producers and consumers remain synced.
+
+Both approaches are available in Pulsar, and you're free to adopt one or the other or to mix and match on a per-topic basis.
+
+1. For the "client-side" approach, producers and consumers can send and receive messages consisting of raw byte arrays and leave all type safety enforcement to the application on an "out-of-band" basis.
+1. For the "server-side" approach, Pulsar has a built-in **schema registry** that enables clients to upload data schemas on a per-topic basis. Those schemas dictate which data types are recognized as valid for that topic.
+
+#### Note
+>
+> Currently, the Pulsar schema registry is only available for the [Java client](client-libraries-java.md), [CGo client](client-libraries-go.md), [Python client](client-libraries-python.md), and [C++ client](client-libraries-cpp).
+
+## Basic architecture
+
+Schemas are automatically uploaded when you create a typed Producer with a Schema. Additionally, Schemas can be manually uploaded to, fetched from, and updated via Pulsar's {@inject: rest:REST:tag/schemas} API.
+
+> #### Other schema registry backends
+> Out of the box, Pulsar uses the [Apache BookKeeper](concepts-architecture-overview#persistent-storage) log storage system for schema storage. You can, however, use different backends if you wish. Documentation for custom schema storage logic is coming soon.
+
+## How schemas work
+
+Pulsar schemas are applied and enforced *at the topic level* (schemas cannot be applied at the namespace or tenant level). Producers and consumers upload schemas to Pulsar brokers.
+
+Pulsar schemas are fairly simple data structures that consist of:
+
+* A **name**. In Pulsar, a schema's name is the topic to which the schema is applied.
+* A **payload**, which is a binary representation of the schema
+* A schema [**type**](#supported-schema-formats)
+* User-defined **properties** as a string/string map. Usage of properties is wholly application specific. Possible properties might be the Git hash associated with a schema, an environment like `dev` or `prod`, etc.
+
+## Schema versions
+
+Each schema stored with a topic has a version. Schema version manages schema changes happening within a topic.
+
+Messages produced with a given schema is tagged with a schema version. Therefore, when a message is consumed by a Pulsar client, the Pulsar client can use the schema version to retrieve the corresponding schema and deserialize data.
+
+Schemas are versioned in succession.The schema is stored in a broker that handles the associated topics, so that version assignments can be made.
+
+Once a version is assigned/fetched to/for a schema, all subsequent messages produced by that producer are tagged with the appropriate version.
+
+In order to illustrate how schema versioning works, let's walk through an example. Imagine that the Pulsar [Java client](client-libraries-java) created using the code below attempts to connect to Pulsar and begin sending messages:
+
+```java
+
+PulsarClient client = PulsarClient.builder()
+        .serviceUrl("pulsar://localhost:6650")
+        .build();
+
+Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
+        .topic("sensor-data")
+        .sendTimeout(3, TimeUnit.SECONDS)
+        .create();
+
+```
+
+The table below lists the possible scenarios when this connection attempt occurs and what will happen in light of each scenario:
+
+Scenario | What happens
+:--------|:------------
+No schema exists for the topic | The producer is created using the given schema. The schema is transmitted to the broker and stored (since no existing schema is "compatible" with the `SensorReading` schema). Any consumer created using the same schema/topic can consume messages from the `sensor-data` topic.
+A schema already exists; the producer connects using the same schema that's already stored | The schema is transmitted to the Pulsar broker. The broker determines that the schema is compatible. The broker attempts to store the schema in [BookKeeper](concepts-architecture-overview.md#persistent-storage) but then determines that it's already stored, so it's then used to tag produced messages.
+A schema already exists; the producer connects using a new schema that is compatible | The producer transmits the schema to the broker. The broker determines that the schema is compatible and stores the new schema as the current version (with a new version number).
+
+> Schemas are versioned in succession. Schema storage happens in the broker that handles the associated topic so that version assignments can be made. Once a version is assigned/fetched to/for a schema, all subsequent messages produced by that producer are tagged with the appropriate version.
+
+If you do not know the schema type of a Pulsar topic in advance, you can use AUTO schema to produce or consume generic records to or from brokers.
+
+- `AUTO_PRODUCE` schema helps a producer validate whether the bytes sent by the producer is compatible with the schema of a topic. 
+- `AUTO_CONSUME` schema helps a Pulsar topic validate whether the bytes sent by a Pulsar topic is compatible with a consumer, that is, the Pulsar topic deserializes messages into language-specific objects using the schema retrieved from broker-side.  
+
+In `AUTO_CONSUME` mode, you can set the `useProvidedSchemaAsReaderSchema` flag to `false`. Therefore, the messages can be decoded based on the schema associated with the messages.
+
+## Supported schema formats
+
+The following formats are supported by the Pulsar schema registry:
+
+* None. If no schema is specified for a topic, producers and consumers will handle raw bytes.
+* `String` (used for UTF-8-encoded strings)
+* [JSON](https://www.json.org/)
+* [Protobuf](https://developers.google.com/protocol-buffers/)
+* [Avro](https://avro.apache.org/)
+
+For usage instructions, see the documentation for your preferred client library:
+
+* [Java](client-libraries-java.md#schemas)
+
+> Support for other schema formats will be added in future releases of Pulsar.
+
+The following example shows how to define an Avro schema using the `GenericSchemaBuilder`, generate a generic Avro schema using `GenericRecordBuilder`, and consume messages into `GenericRecord`.
+
+**Example** 
+
+1. Use the `RecordSchemaBuilder` to build a schema.
+
+   ```java
+   
+   RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName");
+   recordSchemaBuilder.field("intField").type(SchemaType.INT32);
+   SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO);
+
+   Producer<GenericRecord> producer = client.newProducer(Schema.generic(schemaInfo)).create();
+   
+   ```
+
+2. Use `RecordBuilder` to build the generic records.
+
+   ```java
+   
+   producer.newMessage().value(schema.newRecordBuilder()
+               .set("intField", 32)
+               .build()).send();
+   
+   ```
+
+## Managing Schemas
+
+You can use Pulsar admin tools to manage schemas for topics.
diff --git a/site2/website-next/versioned_docs/version-2.4.1/concepts-tiered-storage.md b/site2/website-next/versioned_docs/version-2.4.1/concepts-tiered-storage.md
new file mode 100644
index 0000000..e912d77
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/concepts-tiered-storage.md
@@ -0,0 +1,18 @@
+---
+id: concepts-tiered-storage
+title: Tiered Storage
+sidebar_label: "Tiered Storage"
+original_id: concepts-tiered-storage
+---
+
+Pulsar's segment oriented architecture allows for topic backlogs to grow very large, effectively without limit. However, this can become expensive over time.
+
+One way to alleviate this cost is to use Tiered Storage. With tiered storage, older messages in the backlog can be moved from BookKeeper to a cheaper storage mechanism, while still allowing clients to access the backlog as if nothing had changed.
+
+![Tiered Storage](/assets/pulsar-tiered-storage.png)
+
+> Data written to BookKeeper is replicated to 3 physical machines by default. However, once a segment is sealed in BookKeeper it becomes immutable and can be copied to long term storage. Long term storage can achieve cost savings by using mechanisms such as [Reed-Solomon error correction](https://en.wikipedia.org/wiki/Reed%E2%80%93Solomon_error_correction) to require fewer physical copies of data.
+
+Pulsar currently supports S3 and Google Cloud Storage (GCS) for [long term store](https://pulsar.apache.org/docs/en/cookbooks-tiered-storage/). Offloading to long term storage triggered via a Rest API or command line interface. The user passes in the amount of topic data they wish to retain on BookKeeper, and the broker will copy the backlog data to long term storage. The original data will then be deleted from BookKeeper after a configured delay (4 hours by default).
+
+> For a guide for setting up tiered storage, see the [Tiered storage cookbook](cookbooks-tiered-storage).
diff --git a/site2/website-next/versioned_docs/version-2.4.1/concepts-topic-compaction.md b/site2/website-next/versioned_docs/version-2.4.1/concepts-topic-compaction.md
new file mode 100644
index 0000000..c85e703
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/concepts-topic-compaction.md
@@ -0,0 +1,37 @@
+---
+id: concepts-topic-compaction
+title: Topic Compaction
+sidebar_label: "Topic Compaction"
+original_id: concepts-topic-compaction
+---
+
+Pulsar was built with highly scalable [persistent storage](concepts-architecture-overview.md#persistent-storage) of message data as a primary objective. Pulsar topics enable you to persistently store as many unacknowledged messages as you need while preserving message ordering. By default, Pulsar stores *all* unacknowledged/unprocessed messages produced on a topic. Accumulating many unacknowledged messages on a topic is necessary for many Pulsar use cases but it can also be very time int [...]
+
+> For a more practical guide to topic compaction, see the [Topic compaction cookbook](cookbooks-compaction).
+
+For some use cases consumers don't need a complete "image" of the topic log. They may only need a few values to construct a more "shallow" image of the log, perhaps even just the most recent value. For these kinds of use cases Pulsar offers **topic compaction**. When you run compaction on a topic, Pulsar goes through a topic's backlog and removes messages that are *obscured* by later messages, i.e. it goes through the topic on a per-key basis and leaves only the most recent message assoc [...]
+
+Pulsar's topic compaction feature:
+
+* Allows for faster "rewind" through topic logs
+* Applies only to [persistent topics](concepts-architecture-overview.md#persistent-storage)
+* Triggered automatically when the backlog reaches a certain size or can be triggered manually via the command line. See the [Topic compaction cookbook](cookbooks-compaction)
+* Is conceptually and operationally distinct from [retention and expiry](concepts-messaging.md#message-retention-and-expiry). Topic compaction *does*, however, respect retention. If retention has removed a message from the message backlog of a topic, the message will also not be readable from the compacted topic ledger.
+
+> #### Topic compaction example: the stock ticker
+> An example use case for a compacted Pulsar topic would be a stock ticker topic. On a stock ticker topic, each message bears a timestamped dollar value for stocks for purchase (with the message key holding the stock symbol, e.g. `AAPL` or `GOOG`). With a stock ticker you may care only about the most recent value(s) of the stock and have no interest in historical data (i.e. you don't need to construct a complete image of the topic's sequence of messages per key). Compaction would be high [...]
+
+
+## How topic compaction works
+
+When topic compaction is triggered [via the CLI](cookbooks-compaction), Pulsar will iterate over the entire topic from beginning to end. For each key that it encounters the compaction routine will keep a record of the latest occurrence of that key.
+
+After that, the broker will create a new [BookKeeper ledger](concepts-architecture-overview.md#ledgers) and make a second iteration through each message on the topic. For each message, if the key matches the latest occurrence of that key, then the key's data payload, message ID, and metadata will be written to the newly created ledger. If the key doesn't match the latest then the message will be skipped and left alone. If any given message has an empty payload, it will be skipped and con [...]
+
+After the initial compaction operation, the Pulsar [broker](reference-terminology.md#broker) that owns the topic is notified whenever any future changes are made to the compaction horizon and compacted backlog. When such changes occur:
+
+* Clients (consumers and readers) that have read compacted enabled will attempt to read messages from a topic and either:
+  * Read from the topic like normal (if the message ID is greater than or equal to the compaction horizon) or
+  * Read beginning at the compaction horizon (if the message ID is lower than the compaction horizon)
+
+
diff --git a/site2/website-next/versioned_docs/version-2.4.1/getting-started-standalone.md b/site2/website-next/versioned_docs/version-2.4.1/getting-started-standalone.md
new file mode 100644
index 0000000..2403cb4
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/getting-started-standalone.md
@@ -0,0 +1,266 @@
+---
+slug: /
+id: standalone
+title: Set up a standalone Pulsar locally
+sidebar_label: "Run Pulsar locally"
+original_id: standalone
+---
+
+For local development and testing, you can run Pulsar in standalone mode on your machine. The standalone mode includes a Pulsar broker, the necessary ZooKeeper and BookKeeper components running inside of a single Java Virtual Machine (JVM) process.
+
+> #### Pulsar in production? 
+> If you're looking to run a full production Pulsar installation, see the [Deploying a Pulsar instance](deploy-bare-metal) guide.
+
+## Install Pulsar standalone
+
+This tutorial guides you through every step of the installation process.
+
+### System requirements
+
+Currently, Pulsar is available for 64-bit **macOS**, **Linux**, and **Windows**. To use Pulsar, you need to install 64-bit JRE/JDK 8 or later versions.
+
+:::tip
+
+By default, Pulsar allocates 2G JVM heap memory to start. It can be changed in `conf/pulsar_env.sh` file under `PULSAR_MEM`. This is extra options passed into JVM. 
+
+:::
+
+### Install Pulsar using binary release
+
+To get started with Pulsar, download a binary tarball release in one of the following ways:
+
+* download from the Apache mirror (<a href="pulsar:binary_release_url" download>Pulsar @pulsar:version@ binary release</a>)
+
+* download from the Pulsar [downloads page](pulsar:download_page_url)  
+  
+* download from the Pulsar [releases page](https://github.com/apache/pulsar/releases/latest)
+  
+* use [wget](https://www.gnu.org/software/wget):
+
+  ```shell
+  
+  $ wget pulsar:binary_release_url
+  
+  ```
+
+After you download the tarball, untar it and use the `cd` command to navigate to the resulting directory:
+
+```bash
+
+$ tar xvfz apache-pulsar-@pulsar:version@-bin.tar.gz
+$ cd apache-pulsar-@pulsar:version@
+
+```
+
+#### What your package contains
+
+The Pulsar binary package initially contains the following directories:
+
+Directory | Contains
+:---------|:--------
+`bin` | Pulsar's command-line tools, such as [`pulsar`](reference-cli-tools.md#pulsar) and [`pulsar-admin`](reference-pulsar-admin).
+`conf` | Configuration files for Pulsar, including [broker configuration](reference-configuration.md#broker), [ZooKeeper configuration](reference-configuration.md#zookeeper), and more.
+`examples` | A Java JAR file containing [Pulsar Functions](functions-overview) example.
+`lib` | The [JAR](https://en.wikipedia.org/wiki/JAR_(file_format)) files used by Pulsar.
+`licenses` | License files, in the`.txt` form, for various components of the Pulsar [codebase](https://github.com/apache/pulsar).
+
+These directories are created once you begin running Pulsar.
+
+Directory | Contains
+:---------|:--------
+`data` | The data storage directory used by ZooKeeper and BookKeeper.
+`instances` | Artifacts created for [Pulsar Functions](functions-overview).
+`logs` | Logs created by the installation.
+
+:::tip
+
+If you want to use builtin connectors and tiered storage offloaders, you can install them according to the following instructions:
+* [Install builtin connectors (optional)](#install-builtin-connectors-optional)
+* [Install tiered storage offloaders (optional)](#install-tiered-storage-offloaders-optional)
+Otherwise, skip this step and perform the next step [Start Pulsar standalone](#start-pulsar-standalone). Pulsar can be successfully installed without installing bulitin connectors and tiered storage offloaders.
+
+:::
+
+### Install builtin connectors (optional)
+
+Since `2.1.0-incubating` release, Pulsar releases a separate binary distribution, containing all the `builtin` connectors.
+To enable those `builtin` connectors, you can download the connectors tarball release in one of the following ways:
+
+* download from the Apache mirror <a href="pulsar:connector_release_url" download>Pulsar IO Connectors @pulsar:version@ release</a>
+
+* download from the Pulsar [downloads page](pulsar:download_page_url)
+
+* download from the Pulsar [releases page](https://github.com/apache/pulsar/releases/latest)
+
+* use [wget](https://www.gnu.org/software/wget):
+
+  ```shell
+  
+  $ wget pulsar:connector_release_url/{connector}-@pulsar:version@.nar
+  
+  ```
+
+After you download the nar file, copy the file to the `connectors` directory in the pulsar directory. 
+For example, if you download the `pulsar-io-aerospike-@pulsar:version@.nar` connector file, enter the following commands:
+
+```bash
+
+$ mkdir connectors
+$ mv pulsar-io-aerospike-@pulsar:version@.nar connectors
+
+$ ls connectors
+pulsar-io-aerospike-@pulsar:version@.nar
+...
+
+```
+
+:::note
+
+* If you are running Pulsar in a bare metal cluster, make sure `connectors` tarball is unzipped in every pulsar directory of the broker
+(or in every pulsar directory of function-worker if you are running a separate worker cluster for Pulsar Functions).
+* If you are [running Pulsar in Docker](getting-started-docker.md) or deploying Pulsar using a docker image (e.g. [K8S](deploy-kubernetes.md) or [DCOS](deploy-dcos)),
+you can use the `apachepulsar/pulsar-all` image instead of the `apachepulsar/pulsar` image. `apachepulsar/pulsar-all` image has already bundled [all builtin connectors](io-overview.md#working-with-connectors).
+
+:::
+
+### Install tiered storage offloaders (optional)
+
+:::tip
+
+Since `2.2.0` release, Pulsar releases a separate binary distribution, containing the tiered storage offloaders.
+To enable tiered storage feature, follow the instructions below; otherwise skip this section.
+
+:::
+
+To get started with [tiered storage offloaders](concepts-tiered-storage), you need to download the offloaders tarball release on every broker node in one of the following ways:
+
+* download from the Apache mirror <a href="pulsar:offloader_release_url" download>Pulsar Tiered Storage Offloaders @pulsar:version@ release</a>
+
+* download from the Pulsar [downloads page](pulsar:download_page_url)
+
+* download from the Pulsar [releases page](https://github.com/apache/pulsar/releases/latest)
+
+* use [wget](https://www.gnu.org/software/wget):
+
+  ```shell
+  
+  $ wget pulsar:offloader_release_url
+  
+  ```
+
+After you download the tarball, untar the offloaders package and copy the offloaders as `offloaders`
+in the pulsar directory:
+
+```bash
+
+$ tar xvfz apache-pulsar-offloaders-@pulsar:version@-bin.tar.gz
+
+// you will find a directory named `apache-pulsar-offloaders-@pulsar:version@` in the pulsar directory
+// then copy the offloaders
+
+$ mv apache-pulsar-offloaders-@pulsar:version@/offloaders offloaders
+
+$ ls offloaders
+tiered-storage-jcloud-@pulsar:version@.nar
+
+```
+
+For more information on how to configure tiered storage, see [Tiered storage cookbook](cookbooks-tiered-storage).
+
+:::note
+
+* If you are running Pulsar in a bare metal cluster, make sure that `offloaders` tarball is unzipped in every broker's pulsar directory.
+* If you are [running Pulsar in Docker](getting-started-docker.md) or deploying Pulsar using a docker image (e.g. [K8S](deploy-kubernetes.md) or [DCOS](deploy-dcos)),
+you can use the `apachepulsar/pulsar-all` image instead of the `apachepulsar/pulsar` image. `apachepulsar/pulsar-all` image has already bundled tiered storage offloaders.
+
+:::
+
+## Start Pulsar standalone
+
+Once you have an up-to-date local copy of the release, you can start a local cluster using the [`pulsar`](reference-cli-tools.md#pulsar) command, which is stored in the `bin` directory, and specifying that you want to start Pulsar in standalone mode.
+
+```bash
+
+$ bin/pulsar standalone
+
+```
+
+If you have started Pulsar successfully, you will see `INFO`-level log messages like this:
+
+```bash
+
+2017-06-01 14:46:29,192 - INFO  - [main:WebSocketService@95] - Configuration Store cache started
+2017-06-01 14:46:29,192 - INFO  - [main:AuthenticationService@61] - Authentication is disabled
+2017-06-01 14:46:29,192 - INFO  - [main:WebSocketService@108] - Pulsar WebSocket Service started
+
+```
+
+:::tip
+
+* The service is running on your terminal, which is under your direct control. If you need to run other commands, open a new terminal window.  
+
+:::
+
+You can also run the service as a background process using the `pulsar-daemon start standalone` command. For more information, see [pulsar-daemon](https://pulsar.apache.org/docs/en/reference-cli-tools/#pulsar-daemon).
+> 
+> * By default, there is no encryption, authentication, or authorization configured. Apache Pulsar can be accessed from remote server without any authorization. Please do check [Security Overview](security-overview) document to secure your deployment.
+>
+> * When you start a local standalone cluster, a `public/default` [namespace](concepts-messaging.md#namespaces) is created automatically. The namespace is used for development purposes. All Pulsar topics are managed within namespaces. For more information, see [Topics](concepts-messaging.md#topics).
+
+## Use Pulsar standalone
+
+Pulsar provides a CLI tool called [`pulsar-client`](reference-cli-tools.md#pulsar-client). The pulsar-client tool enables you to consume and produce messages to a Pulsar topic in a running cluster. 
+
+### Consume a message
+
+The following command consumes a message with the subscription name `first-subscription` to the `my-topic` topic:
+
+```bash
+
+$ bin/pulsar-client consume my-topic -s "first-subscription"
+
+```
+
+If the message has been successfully consumed, you will see a confirmation like the following in the `pulsar-client` logs:
+
+```
+
+09:56:55.566 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.MultiTopicsConsumerImpl - [TopicsConsumerFakeTopicNamee2df9] [first-subscription] Success subscribe new topic my-topic in topics consumer, partitions: 4, allTopicPartitionsNumber: 4
+
+```
+
+:::tip
+
+As you have noticed that we do not explicitly create the `my-topic` topic, to which we consume the message. When you consume a message to a topic that does not yet exist, Pulsar creates that topic for you automatically. Producing a message to a topic that does not exist will automatically create that topic for you as well.
+
+:::
+
+### Produce a message
+
+The following command produces a message saying `hello-pulsar` to the `my-topic` topic:
+
+```bash
+
+$ bin/pulsar-client produce my-topic --messages "hello-pulsar"
+
+```
+
+If the message has been successfully published to the topic, you will see a confirmation like the following in the `pulsar-client` logs:
+
+```
+
+13:09:39.356 [main] INFO  org.apache.pulsar.client.cli.PulsarClientTool - 1 messages successfully produced
+
+```
+
+## Stop Pulsar standalone
+
+Press `Ctrl+C` to stop a local standalone Pulsar.
+
+:::tip
+
+If the service runs as a background process using the `pulsar-daemon start standalone` command, then use the `pulsar-daemon stop standalone`  command to stop the service.
+For more information, see [pulsar-daemon](https://pulsar.apache.org/docs/en/reference-cli-tools/#pulsar-daemon).
+
+:::
+
diff --git a/site2/website-next/versioned_docs/version-2.4.1/pulsar-2.0.md b/site2/website-next/versioned_docs/version-2.4.1/pulsar-2.0.md
new file mode 100644
index 0000000..11c5e66
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/pulsar-2.0.md
@@ -0,0 +1,72 @@
+---
+id: pulsar-2.0
+title: Pulsar 2.0
+sidebar_label: "Pulsar 2.0"
+original_id: pulsar-2.0
+---
+
+Pulsar 2.0 is a major new release for Pulsar that brings some bold changes to the platform, including [simplified topic names](#topic-names), the addition of the [Pulsar Functions](functions-overview) feature, some terminology changes, and more.
+
+## New features in Pulsar 2.0
+
+Feature | Description
+:-------|:-----------
+[Pulsar Functions](functions-overview) | A lightweight compute option for Pulsar
+
+## Major changes
+
+There are a few major changes that you should be aware of, as they may significantly impact your day-to-day usage.
+
+### Properties versus tenants
+
+Previously, Pulsar had a concept of properties. A property is essentially the exact same thing as a tenant, so the "property" terminology has been removed in version 2.0. The [`pulsar-admin properties`](reference-pulsar-admin.md#pulsar-admin) command-line interface, for example, has been replaced with the [`pulsar-admin tenants`](reference-pulsar-admin.md#pulsar-admin-tenants) interface. In some cases the properties terminology is still used but is now considered deprecated and will be r [...]
+
+### Topic names
+
+Prior to version 2.0, *all* Pulsar topics had the following form:
+
+```http
+
+{persistent|non-persistent}://property/cluster/namespace/topic
+
+```
+
+Two important changes have been made in Pulsar 2.0:
+
+* There is no longer a [cluster component](#no-cluster)
+* Properties have been [renamed to tenants](#tenants)
+* You can use a [flexible](#flexible-topic-naming) naming system to shorten many topic names
+* `/` is not allowed in topic name
+
+#### No cluster component
+
+The cluster component has been removed from topic names. Thus, all topic names now have the following form:
+
+```http
+
+{persistent|non-persistent}://tenant/namespace/topic
+
+```
+
+> Existing topics that use the legacy name format will continue to work without any change, and there are no plans to change that.
+
+
+#### Flexible topic naming
+
+All topic names in Pulsar 2.0 internally have the form shown [above](#no-cluster-component) but you can now use shorthand names in many cases (for the sake of simplicity). The flexible naming system stems from the fact that there is now a default topic type, tenant, and namespace:
+
+Topic aspect | Default
+:------------|:-------
+topic type | `persistent`
+tenant | `public`
+namespace | `default`
+
+The table below shows some example topic name translations that use implicit defaults:
+
+Input topic name | Translated topic name
+:----------------|:---------------------
+`my-topic` | `persistent://public/default/my-topic`
+`my-tenant/my-namespace/my-topic` | `persistent://my-tenant/my-namespace/my-topic`
+
+> For [non-persistent topics](concepts-messaging.md#non-persistent-topics) you'll need to continue to specify the entire topic name, as the default-based rules for persistent topic names don't apply. Thus you cannot use a shorthand name like `non-persistent://my-topic` and would need to use `non-persistent://public/default/my-topic` instead
+
diff --git a/site2/website-next/versioned_docs/version-2.4.1/schema-evolution-compatibility.md b/site2/website-next/versioned_docs/version-2.4.1/schema-evolution-compatibility.md
new file mode 100644
index 0000000..17175f5
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/schema-evolution-compatibility.md
@@ -0,0 +1,169 @@
+---
+id: schema-evolution-compatibility
+title: Schema evolution and compatibility
+sidebar_label: "Schema evolution and compatibility"
+original_id: schema-evolution-compatibility
+---
+
+Normally, schemas do not stay the same over a long period of time. Instead, they undergo evolutions to satisfy new needs. 
+
+This chapter examines how Pulsar schema evolves and what Pulsar schema compatibility check strategies are.
+
+## Schema evolution
+
+Pulsar schema is defined in a data structure called `SchemaInfo`. 
+
+Each `SchemaInfo` stored with a topic has a version. The version is used to manage the schema changes happening within a topic. 
+
+The message produced with `SchemaInfo` is tagged with a schema version. When a message is consumed by a Pulsar client, the Pulsar client can use the schema version to retrieve the corresponding `SchemaInfo` and use the correct schema information to deserialize data.
+
+### What is schema evolution?
+
+Schemas store the details of attributes and types. To satisfy new business requirements,  you need to update schemas inevitably over time, which is called **schema evolution**. 
+
+Any schema changes affect downstream consumers. Schema evolution ensures that the downstream consumers can seamlessly handle data encoded with both old schemas and new schemas. 
+
+### How Pulsar schema should evolve? 
+
+The answer is Pulsar schema compatibility check strategy. It determines how schema compares old schemas with new schemas in topics.
+
+For more information, see [Schema compatibility check strategy](#schema-compatibility-check-strategy).
+
+### How does Pulsar support schema evolution?
+
+1. When a producer/consumer/reader connects to a broker, the broker deploys the schema compatibility checker configured by `schemaRegistryCompatibilityCheckers` to enforce schema compatibility check. 
+
+   The schema compatibility checker is one instance per schema type. 
+   
+   Currently, Avro and JSON have their own compatibility checkers, while all the other schema types share the default compatibility checker which disables schema evolution.
+
+2. The producer/consumer/reader sends its client `SchemaInfo` to the broker. 
+   
+3. The broker knows the schema type and locates the schema compatibility checker for that type. 
+
+4. The broker uses the checker to check if the `SchemaInfo` is compatible with the latest schema of the topic by applying its compatibility check strategy. 
+   
+   Currently, the compatibility check strategy is configured at the namespace level and applied to all the topics within that namespace.
+
+## Schema compatibility check strategy
+
+Pulsar has 8 schema compatibility check strategies, which are summarized in the following table.
+
+Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is the oldest and V3 is the latest:
+
+|  Compatibility check strategy  |   Definition  |   Changes allowed  |   Check against which schema  |   Upgrade first  | 
+| --- | --- | --- | --- | --- |
+|  `ALWAYS_COMPATIBLE`  |   Disable schema compatibility check.  |   All changes are allowed  |   All previous versions  |   Any order  | 
+|  `ALWAYS_INCOMPATIBLE`  |   Disable schema evolution.  |   All changes are disabled  |   None  |   None  | 
+|  `BACKWARD`  |   Consumers using the schema V3 can process data written by producers using the schema V3 or V2.  |   <li>Add optional fields </li><li>Delete fields </li> |   Latest version  |   Consumers  | 
+|  `BACKWARD_TRANSITIVE`  |   Consumers using the schema V3 can process data written by producers using the schema V3, V2 or V1.  |   <li>Add optional fields </li><li>Delete fields </li> |   All previous versions  |   Consumers  | 
+|  `FORWARD`  |   Consumers using the schema V3 or V2 can process data written by producers using the schema V3.  |   <li>Add fields </li><li>Delete optional fields </li> |   Latest version  |   Producers  | 
+|  `FORWARD_TRANSITIVE`  |   Consumers using the schema V3, V2 or V1 can process data written by producers using the schema V3.  |   <li>Add fields </li><li>Delete optional fields </li> |   All previous versions  |   Producers  | 
+|  `FULL`  |   Backward and forward compatible between the schema V3 and V2.  |   <li>Modify optional fields </li> |   Latest version  |   Any order  | 
+|  `FULL_TRANSITIVE`  |   Backward and forward compatible among the schema V3, V2, and V1.  |   <li>Modify optional fields </li> |   All previous versions  |   Any order  | 
+
+### ALWAYS_COMPATIBLE and ALWAYS_INCOMPATIBLE 
+
+|  Compatibility check strategy  |   Definition  |   Note  | 
+| --- | --- | --- |
+|  `ALWAYS_COMPATIBLE`  |   Disable schema compatibility check.  |   None  | 
+|  `ALWAYS_INCOMPATIBLE`  |   Disable schema evolution, that is, any schema change is rejected.  |   <li>For all schema types except Avro and JSON, the default schema compatibility check strategy is `ALWAYS_INCOMPATIBLE`. </li><li>For Avro and JSON, the default schema compatibility check strategy is `FULL`. </li> | 
+
+#### Example 
+  
+* Example  1
+  
+  In some situations, an application needs to store events of several different types in the same Pulsar topic. 
+
+  In particular, when developing a data model in an `Event Sourcing` style, you might have several kinds of events that affect the state of an entity. 
+
+  For example, for a user entity, there are `userCreated`, `userAddressChanged` and `userEnquiryReceived` events. The application requires that those events are always read in the same order. 
+
+  Consequently, those events need to go in the same Pulsar partition to maintain order. This application can use `ALWAYS_COMPATIBLE` to allow different kinds of events co-exist in the same topic.
+
+* Example 2
+
+  Sometimes we also make incompatible changes. 
+
+  For example, you are modifying a field type from `string` to `int`.
+
+  In this case, you need to:
+
+  * Upgrade all producers and consumers to the new schema versions at the same time.
+
+  * Optionally, create a new topic and start migrating applications to use the new topic and the new schema, avoiding the need to handle two incompatible versions in the same topic.
+
+### BACKWARD and BACKWARD_TRANSITIVE 
+
+Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is the oldest and V3 is the latest:
+
+| Compatibility check strategy | Definition  | Description |
+|---|---|---|
+`BACKWARD` | Consumers using the new schema can process data written by producers using the **last schema**. | The consumers using the schema V3 can process data written by producers using the schema V3 or V2. |
+`BACKWARD_TRANSITIVE` | Consumers using the new schema can process data written by producers using **all previous schemas**. | The consumers using the schema V3 can process data written by producers using the schema V3, V2, or V1. |
+
+#### Example  
+  
+* Example 1
+  
+  Remove a field.
+  
+  A consumer constructed to process events without one field can process events written with the old schema containing the field, and the consumer will ignore that field.
+
+* Example 2
+  
+  You want to load all Pulsar data into a Hive data warehouse and run SQL queries against the data. 
+
+  Same SQL queries must continue to work even the data is changed. To support it, you can evolve the schemas using the `BACKWARD` strategy.
+
+### FORWARD and FORWARD_TRANSITIVE 
+
+Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is the oldest and V3 is the latest:
+
+| Compatibility check strategy | Definition | Description |
+|---|---|---|
+`FORWARD` | Consumers using the **last schema** can process data written by producers using a new schema, even though they may not be able to use the full capabilities of the new schema. | The consumers using the schema V3 or V2 can process data written by producers using the schema V3. |
+`FORWARD_TRANSITIVE` | Consumers using **all previous schemas** can process data written by producers using a new schema. | The consumers using the schema V3, V2, or V1 can process data written by producers using the schema V3. 
+
+#### Example  
+  
+* Example 1
+  
+  Add a field.
+  
+  In most data formats, consumers written to process events without new fields can continue doing so even when they receive new events containing new fields.
+
+* Example 2
+  
+  If a consumer has an application logic tied to a full version of a schema, the application logic may not be updated instantly when the schema evolves.
+  
+  In this case, you need to project data with a new schema onto an old schema that the application understands. 
+  
+  Consequently, you can evolve the schemas using the `FORWARD` strategy to ensure that the old schema can process data encoded with the new schema.
+
+### FULL and FULL_TRANSITIVE 
+
+Suppose that you have a topic containing three schemas (V1, V2, and V3), V1 is the oldest and V3 is the latest:
+
+|  Compatibility check strategy  |   Definition  |   Description  |   Note  | 
+| --- | --- | --- | --- |
+|  `FULL`  |   Schemas are both backward and forward compatible, which means: Consumers using the last schema can process data written by producers using the new schema. AND Consumers using the new schema can process data written by producers using the last schema.  |   Consumers using the schema V3 can process data written by producers using the schema V3 or V2. AND Consumers using the schema V3 or V2 can process data written by producers using the schema V3.  |   <li>For Avro and JSON, [...]
+|  `FULL_TRANSITIVE`  |   The new schema is backward and forward compatible with all previously registered schemas.  |   Consumers using the schema V3 can process data written by producers using the schema V3, V2 or V1. AND Consumers using the schema V3, V2 or V1 can process data written by producers using the schema V3.  |   None  | 
+
+#### Example  
+
+In some data formats, for example, Avro, you can define fields with default values. Consequently, adding or removing a field with a default value is a fully compatible change.
+
+## Order of upgrading clients
+
+The order of upgrading client applications is determined by the compatibility check strategy.
+
+For example, the producers using schemas to write data to Pulsar and the consumers using schemas to read data from Pulsar. 
+
+|  Compatibility check strategy  |   Upgrade first  |   Description  | 
+| --- | --- | --- |
+|  `ALWAYS_COMPATIBLE`  |   Any order  |   The compatibility check is disabled. Consequently, you can upgrade the producers and consumers in **any order**.  | 
+|  `ALWAYS_INCOMPATIBLE`  |   None  |   The schema evolution is disabled.  | 
+|  <li>`BACKWARD` </li><li>`BACKWARD_TRANSITIVE` </li> |   Consumers  |   There is no guarantee that consumers using the old schema can read data produced using the new schema. Consequently, **upgrade all consumers first**, and then start producing new data.  | 
+|  <li>`FORWARD` </li><li>`FORWARD_TRANSITIVE` </li> |   Producers  |   There is no guarantee that consumers using the new schema can read data produced using the old schema. Consequently, **upgrade all producers first**<li>to use the new schema and ensure that the data already produced using the old schemas are not available to consumers, and then upgrade the consumers. </li> | 
+|  <li>`FULL` </li><li>`FULL_TRANSITIVE` </li> |   Any order  |   There is no guarantee that consumers using the old schema can read data produced using the new schema and consumers using the new schema can read data produced using the old schema. Consequently, you can upgrade the producers and consumers in **any order**.  | 
diff --git a/site2/website-next/versioned_docs/version-2.4.1/schema-get-started.md b/site2/website-next/versioned_docs/version-2.4.1/schema-get-started.md
new file mode 100644
index 0000000..f569471
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/schema-get-started.md
@@ -0,0 +1,98 @@
+---
+id: schema-get-started
+title: Get started
+sidebar_label: "Get started"
+original_id: schema-get-started
+---
+
+This chapter introduces Pulsar schemas and explains why they are important. 
+
+## Schema Registry
+
+Type safety is extremely important in any application built around a message bus like Pulsar. 
+
+Producers and consumers need some kind of mechanism for coordinating types at the topic level to avoid various potential problems arise. For example, serialization and deserialization issues. 
+
+Applications typically adopt one of the following approaches to guarantee type safety in messaging. Both approaches are available in Pulsar, and you're free to adopt one or the other or to mix and match on a per-topic basis.
+
+### Client-side approach
+
+Producers and consumers are responsible for not only serializing and deserializing messages (which consist of raw bytes) but also "knowing" which types are being transmitted via which topics. 
+
+If a producer is sending temperature sensor data on the topic `topic-1`, consumers of that topic will run into trouble if they attempt to parse that data as moisture sensor readings.
+
+Producers and consumers can send and receive messages consisting of raw byte arrays and leave all type safety enforcement to the application on an "out-of-band" basis.
+
+### Server-side approach 
+
+Producers and consumers inform the system which data types can be transmitted via the topic. 
+
+With this approach, the messaging system enforces type safety and ensures that producers and consumers remain synced.
+
+Pulsar has a built-in **schema registry** that enables clients to upload data schemas on a per-topic basis. Those schemas dictate which data types are recognized as valid for that topic.
+
+## Why use schema
+
+When a schema is enabled, Pulsar does parse data, it takes bytes as inputs and sends bytes as outputs. While data has meaning beyond bytes, you need to parse data and might encounter parse exceptions which mainly occur in the following situations:
+
+* The field does not exist
+
+* The field type has changed (for example, `string` is changed to `int`)
+
+There are a few methods to prevent and overcome these exceptions, for example, you can catch exceptions when parsing errors, which makes code hard to maintain; or you can adopt a schema management system to perform schema evolution, not to break downstream applications, and enforces type safety to max extend in the language you are using, the solution is Pulsar Schema.
+
+Pulsar schema enables you to use language-specific types of data when constructing and handling messages from simple types like `string` to more complex application-specific types. 
+
+**Example** 
+
+You can use the _User_ class to define the messages sent to Pulsar topics.
+
+```
+
+public class User {
+    String name;
+    int age;
+}
+
+```
+
+When constructing a producer with the _User_ class, you can specify a schema or not as below.
+
+### Without schema
+
+If you construct a producer without specifying a schema, then the producer can only produce messages of type `byte[]`. If you have a POJO class, you need to serialize the POJO into bytes before sending messages.
+
+**Example**
+
+```
+
+Producer<byte[]> producer = client.newProducer()
+        .topic(topic)
+        .create();
+User user = new User("Tom", 28);
+byte[] message = … // serialize the `user` by yourself;
+producer.send(message);
+
+```
+
+### With schema
+
+If you construct a producer with specifying a schema, then you can send a class to a topic directly without worrying about how to serialize POJOs into bytes. 
+
+**Example**
+
+This example constructs a producer with the _JSONSchema_, and you can send the _User_ class to topics directly without worrying about how to serialize it into bytes. 
+
+```
+
+Producer<User> producer = client.newProducer(JSONSchema.of(User.class))
+        .topic(topic)
+        .create();
+User user = new User("Tom", 28);
+producer.send(User);
+
+```
+
+### Summary
+
+When constructing a producer with a schema, you do not need to serialize messages into bytes, instead Pulsar schema does this job in the background.
diff --git a/site2/website-next/versioned_docs/version-2.4.1/schema-manage.md b/site2/website-next/versioned_docs/version-2.4.1/schema-manage.md
new file mode 100644
index 0000000..d2f2443
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/schema-manage.md
@@ -0,0 +1,602 @@
+---
+id: schema-manage
+title: Manage schema
+sidebar_label: "Manage schema"
+original_id: schema-manage
+---
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+
+This guide demonstrates the ways to manage schemas:
+
+* Automatically 
+  
+  * [Schema AutoUpdate](#schema-autoupdate)
+
+* Manually
+  
+  * [Schema manual management](#schema-manual-management)
+  
+  * [Custom schema storage](#custom-schema-storage)
+
+## Schema AutoUpdate
+
+If a schema passes the schema compatibility check, Pulsar producer automatically updates this schema to the topic it produces by default. 
+
+### AutoUpdate for producer
+
+For a producer, the `AutoUpdate` happens in the following cases:
+
+* If a **topic doesn’t have a schema**, Pulsar registers a schema automatically.
+
+* If a **topic has a schema**:
+
+  * If a **producer doesn’t carry a schema**:
+
+  * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is **disabled** in the namespace to which the topic belongs, the producer is allowed to connect to the topic and produce data. 
+  
+  * If `isSchemaValidationEnforced` or `schemaValidationEnforced` is **enabled** in the namespace to which the topic belongs, the producer is rejected and disconnected.
+
+  * If a **producer carries a schema**:
+  
+  A broker performs the compatibility check based on the configured compatibility check strategy of the namespace to which the topic belongs. 
+  
+  * If it is a new schema and it passes the compatibility check, the broker registers a new schema automatically for the topic.
+
+  * If the schema does not pass the compatibility check, the broker does not register a schema.
+
+![AutoUpdate Producer](/assets/schema-autoupdate-producer.png)
+
+### AutoUpdate for consumer
+
+For a consumer, the `AutoUpdate` happens in the following cases:
+
+* If a **consumer connects to a topic without a schema** (which means the consumer receiving raw bytes), the consumer can connect to the topic successfully without doing any compatibility check.
+
+* If a **consumer connects to a topic with a schema**:
+
+  * If the **topic is idle** (no producers, no entries, no other consumers and no registered schemas), the broker registers a schema for the topic automatically.
+
+  * If the **topic is not idle**, the broker verifies if the schema provided by the consumer is compatible with the registered schema of the topic. 
+  
+  * If the **schema passes the compatibility check**, the consumer can connect to the topic and receive messages. 
+  
+  * If the **schema does not pass the compatibility check**, the consumer is rejected and disconnected.
+
+![AutoUpdate Producer](/assets/schema-autoupdate-consumer.png)
+
+### Manage AutoUpdate strategy
+
+You can use the `pulsar-admin` command to manage the `AutoUpdate` strategy as below:
+
+* [Disable AutoUpdate](#disable-autoupdate)
+
+* [Adjust compatibility](#adjust-compatibility)
+
+#### Disable AutoUpdate 
+
+To disable `AutoUpdate` on a namespace, you can use the `pulsar-admin` command.
+
+```bash
+
+bin/pulsar-admin namespaces set-schema-autoupdate-strategy --disabled tenant/namespace
+
+```
+
+Once the `AutoUpdate` is disabled, you can only register a new schema using the `pulsar-admin` command.
+
+#### Adjust compatibility
+
+To adjust the schema compatibility level on a namespace, you can use the `pulsar-admin` command.
+
+```bash
+
+bin/pulsar-admin namespaces set-schema-autoupdate-strategy --compatibility <compatibility-level> tenant/namespace
+
+```
+
+### Schema validation
+
+By default, `schemaValidationEnforced` is **disabled** for producers:
+
+* This means a producer without a schema can produce any kind of messages to a topic with schemas, which may result in producing trash data to the topic. 
+
+* This allows non-java language clients that don’t support schema can produce messages to a topic with schemas.
+
+However, if you want a stronger guarantee on the topics with schemas, you can enable `schemaValidationEnforced` across the whole cluster or on a per-namespace basis.
+
+#### Enable schema validation
+
+To enable `schemaValidationEnforced` on a namespace, you can use the `pulsar-admin` command.
+
+```bash
+
+bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace
+
+```
+
+#### Disable schema validation
+
+To disable `schemaValidationEnforced` on a namespace, you can use the `pulsar-admin` command.
+
+```bash
+
+bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace
+
+```
+
+## Schema manual management
+
+To manage schemas, you can use one of the following methods.
+
+| Method |  Description | 
+| --- | --- |
+|  **Admin CLI**<li></li> |   You can use the `pulsar-admin` tool to manage Pulsar schemas, brokers, clusters, sources, sinks, topics, tenants and so on. For more information about how to use the `pulsar-admin` tool, see [here](reference-pulsar-admin).  | 
+|  **REST API**<li></li> |   Pulsar exposes schema related management API in Pulsar’s admin RESTful API. You can access the admin RESTful endpoint directly to manage schemas. For more information about how to use the Pulsar REST API, see [here](http://pulsar.apache.org/admin-rest-api/).  | 
+|  **Java Admin API**<li></li> |  Pulsar provides Java admin library. | 
+
+### Upload a schema
+
+To upload (register) a new schema for a topic, you can use one of the following methods.
+
+<Tabs 
+  defaultValue="Admin CLI"
+  values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the `upload` subcommand.
+
+```bash
+
+$ pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>
+
+```
+
+The `schema-definition-file` is in JSON format. 
+
+```json
+
+{
+    "type": "<schema-type>",
+    "schema": "<an-utf8-encoded-string-of-schema-definition-data>",
+    "properties": {} // the properties associated with the schema
+}
+
+```
+
+The `schema-definition-file` includes the following fields:
+
+| Field |  Description | 
+| --- | --- |
+|  `type`  |   The schema type. | 
+|  `schema`  |   The schema definition data, which is encoded in UTF 8 charset. <li>If the schema is a </li>**primitive**<li>schema, this field should be blank. </li><li>If the schema is a </li>**struct**<li>schema, this field should be a JSON string of the Avro schema definition. </li> | 
+|  `properties`  |  The additional properties associated with the schema. | 
+
+Here are examples of the `schema-definition-file` for a JSON schema.
+
+**Example 1**
+
+```json
+
+{
+    "type": "JSON",
+    "schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":\"string\",\"default\":null},{\"name\":\"file3\",\"type\":[\"null\",\"string\"],\"default\":\"dfdf\"}]}",
+    "properties": {}
+}
+
+```
+
+**Example 2**
+
+```json
+
+{
+    "type": "STRING",
+    "schema": "",
+    "properties": {
+       "key1": "value1"
+    }
+}
+
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `POST` request to this endpoint: {@inject: endpoint|POST|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/uploadSchema?version=@pulsar:version_number@}
+
+The post payload is in JSON format.
+
+```json
+
+{
+    "type": "<schema-type>",
+    "schema": "<an-utf8-encoded-string-of-schema-definition-data>",
+    "properties": {} // the properties associated with the schema
+}
+
+```
+
+The post payload includes the following fields:
+
+| Field |  Description | 
+| --- | --- |
+|  `type`  |   The schema type. | 
+|  `schema`  |   The schema definition data, which is encoded in UTF 8 charset. <li>If the schema is a </li>**primitive**<li>schema, this field should be blank. </li><li>If the schema is a </li>**struct**<li>schema, this field should be a JSON string of the Avro schema definition. </li> | 
+|  `properties`  |  The additional properties associated with the schema. |
+
+</TabItem>
+<TabItem value="Java Admin API">
+
+```java
+
+void createSchema(String topic, PostSchemaPayload schemaPayload)
+
+```
+
+The `PostSchemaPayload` includes the following fields:
+
+| Field |  Description | 
+| --- | --- |
+|  `type`  |   The schema type. | 
+|  `schema`  |   The schema definition data, which is encoded in UTF 8 charset. <li>If the schema is a </li>**primitive**<li>schema, this field should be blank. </li><li>If the schema is a </li>**struct**<li>schema, this field should be a JSON string of the Avro schema definition. </li> | 
+|  `properties`  |  The additional properties associated with the schema. | 
+
+Here is an example of `PostSchemaPayload`:
+
+```java
+
+PulsarAdmin admin = …;
+
+PostSchemaPayload payload = new PostSchemaPayload();
+payload.setType("INT8");
+payload.setSchema("");
+
+admin.createSchema("my-tenant/my-ns/my-topic", payload);
+
+```
+
+</TabItem>
+
+</Tabs>
+
+### Get a schema (latest)
+
+To get the latest schema for a topic, you can use one of the following methods. 
+
+<Tabs 
+  defaultValue="Admin CLI"
+  values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the `get` subcommand.
+
+```bash
+
+$ pulsar-admin schemas get <topic-name>
+
+{
+    "version": 0,
+    "type": "String",
+    "timestamp": 0,
+    "data": "string",
+    "properties": {
+        "property1": "string",
+        "property2": "string"
+    }
+}
+
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `GET` request to this endpoint: {@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/getSchema?version=@pulsar:version_number@}
+
+Here is an example of a response, which is returned in JSON format.
+
+```json
+
+{
+    "version": "<the-version-number-of-the-schema>",
+    "type": "<the-schema-type>",
+    "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
+    "data": "<an-utf8-encoded-string-of-schema-definition-data>",
+    "properties": {} // the properties associated with the schema
+}
+
+```
+
+The response includes the following fields:
+
+| Field |  Description | 
+| --- | --- |
+|  `version`  |   The schema version, which is a long number. | 
+|  `type`  |   The schema type. | 
+|  `timestamp`  |   The timestamp of creating this version of schema. | 
+|  `data`  |   The schema definition data, which is encoded in UTF 8 charset. <li>If the schema is a </li>**primitive**<li>schema, this field should be blank. </li><li>If the schema is a </li>**struct**<li>schema, this field should be a JSON string of the Avro schema definition. </li> | 
+|  `properties`  |  The additional properties associated with the schema. |
+
+</TabItem>
+<TabItem value="Java Admin API">
+
+```java
+
+SchemaInfo createSchema(String topic)
+
+```
+
+The `SchemaInfo` includes the following fields:
+
+| Field |  Description | 
+| --- | --- |
+|  `name`  |   The schema name. | 
+|  `type`  |   The schema type. | 
+|  `schema`  |   A byte array of the schema definition data, which is encoded in UTF 8 charset. <li>If the schema is a </li>**primitive**<li>schema, this byte array should be empty. </li><li>If the schema is a </li>**struct**<li>schema, this field should be a JSON string of the Avro schema definition converted to a byte array. </li> | 
+|  `properties`  |  The additional properties associated with the schema. | 
+
+Here is an example of `SchemaInfo`:
+
+```java
+
+PulsarAdmin admin = …;
+
+SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic");
+
+```
+
+</TabItem>
+
+</Tabs>
+
+### Get a schema (specific)
+
+To get a specific version of a schema, you can use one of the following methods.
+
+<Tabs 
+  defaultValue="Admin CLI"
+  values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the `get` subcommand.
+
+```bash
+
+$ pulsar-admin schemas get <topic-name> --version=<version>
+
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `GET` request to a schema endpoint: {@inject: endpoint|GET|/admin/v2/schemas/:tenant/:namespace/:topic/schema/:version|operation/getSchema?version=@pulsar:version_number@}
+
+Here is an example of a response, which is returned in JSON format.
+
+```json
+
+{
+    "version": "<the-version-number-of-the-schema>",
+    "type": "<the-schema-type>",
+    "timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
+    "data": "<an-utf8-encoded-string-of-schema-definition-data>",
+    "properties": {} // the properties associated with the schema
+}
+
+```
+
+The response includes the following fields:
+
+| Field |  Description | 
+| --- | --- |
+|  `version`  |   The schema version, which is a long number. | 
+|  `type`  |   The schema type. | 
+|  `timestamp`  |   The timestamp of creating this version of schema. | 
+|  `data`  |   The schema definition data, which is encoded in UTF 8 charset. <li>If the schema is a </li>**primitive**<li>schema, this field should be blank. </li><li>If the schema is a </li>**struct**<li>schema, this field should be a JSON string of the Avro schema definition. </li> | 
+|  `properties`  |  The additional properties associated with the schema. |
+
+</TabItem>
+<TabItem value="Java Admin API">
+
+```java
+
+SchemaInfo createSchema(String topic, long version)
+
+```
+
+The `SchemaInfo` includes the following fields:
+
+| Field |  Description | 
+| --- | --- |
+|  `name`  |  The schema name. | 
+|  `type`  |  The schema type. | 
+|  `schema`  |   A byte array of the schema definition data, which is encoded in UTF 8. <li>If the schema is a </li>**primitive**<li>schema, this byte array should be empty. </li><li>If the schema is a </li>**struct**<li>schema, this field should be a JSON string of the Avro schema definition converted to a byte array. </li> | 
+|  `properties`  |  The additional properties associated with the schema. | 
+
+Here is an example of `SchemaInfo`:
+
+```java
+
+PulsarAdmin admin = …;
+
+SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);
+
+```
+
+</TabItem>
+
+</Tabs>
+
+### Extract a schema
+
+To provide a schema via a topic, you can use the following method.
+
+<Tabs 
+  defaultValue="Admin CLI"
+  values={[{"label":"Admin CLI","value":"Admin CLI"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the `extract` subcommand.
+
+```bash
+
+$ pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type <type-name>
+
+```
+
+</TabItem>
+
+</Tabs>
+
+### Delete a schema
+
+To delete a schema for a topic, you can use one of the following methods.
+
+:::note
+
+In any case, the **delete** action deletes **all versions** of a schema registered for a topic.
+
+:::
+
+<Tabs 
+  defaultValue="Admin CLI"
+  values={[{"label":"Admin CLI","value":"Admin CLI"},{"label":"REST API","value":"REST API"},{"label":"Java Admin API","value":"Java Admin API"}]}>
+
+<TabItem value="Admin CLI">
+
+Use the `delete` subcommand.
+
+```bash
+
+$ pulsar-admin schemas delete <topic-name>
+
+```
+
+</TabItem>
+<TabItem value="REST API">
+
+Send a `DELETE` request to a schema endpoint: {@inject: endpoint|DELETE|/admin/v2/schemas/:tenant/:namespace/:topic/schema|operation/deleteSchema?version=@pulsar:version_number@}
+
+Here is an example of a response, which is returned in JSON format.
+
+```json
+
+{
+    "version": "<the-latest-version-number-of-the-schema>",
+}
+
+```
+
+The response includes the following field:
+
+Field | Description |
+---|---|
+`version` | The schema version, which is a long number. |
+
+</TabItem>
+<TabItem value="Java Admin API">
+
+```java
+
+void deleteSchema(String topic)
+
+```
+
+Here is an example of deleting a schema.
+
+```java
+
+PulsarAdmin admin = …;
+
+admin.deleteSchema("my-tenant/my-ns/my-topic");
+
+```
+
+</TabItem>
+
+</Tabs>
+
+## Custom schema storage
+
+By default, Pulsar stores various data types of schemas in [Apache BookKeeper](https://bookkeeper.apache.org) deployed alongside Pulsar. 
+
+However, you can use another storage system if needed. 
+
+### Implement
+
+To use a non-default (non-BookKeeper) storage system for Pulsar schemas, you need to implement the following Java interfaces: 
+
+* [SchemaStorage interface](#schemastorage-interface) 
+
+* [SchemaStorageFactory interface](#schemastoragefactory-interface)
+
+#### SchemaStorage interface
+
+The `SchemaStorage` interface has the following methods:
+
+```java
+
+public interface SchemaStorage {
+    // How schemas are updated
+    CompletableFuture<SchemaVersion> put(String key, byte[] value, byte[] hash);
+
+    // How schemas are fetched from storage
+    CompletableFuture<StoredSchema> get(String key, SchemaVersion version);
+
+    // How schemas are deleted
+    CompletableFuture<SchemaVersion> delete(String key);
+
+    // Utility method for converting a schema version byte array to a SchemaVersion object
+    SchemaVersion versionFromBytes(byte[] version);
+
+    // Startup behavior for the schema storage client
+    void start() throws Exception;
+
+    // Shutdown behavior for the schema storage client
+    void close() throws Exception;
+}
+
+```
+
+:::tip
+
+For a complete example of **schema storage** implementation, see [BookKeeperSchemaStorage](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java) class.
+
+:::
+
+#### SchemaStorageFactory interface 
+
+The `SchemaStorageFactory` interface has the following method:
+
+```java
+
+public interface SchemaStorageFactory {
+    @NotNull
+    SchemaStorage create(PulsarService pulsar) throws Exception;
+}
+
+```
+
+:::tip
+
+For a complete example of **schema storage factory** implementation, see [BookKeeperSchemaStorageFactory](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorageFactory.java) class.
+
+:::
+
+### Deploy
+
+To use your custom schema storage implementation, perform the following steps.
+
+1. Package the implementation in a [JAR](https://docs.oracle.com/javase/tutorial/deployment/jar/basicsindex.html) file.
+   
+2. Add the JAR file to the `lib` folder in your Pulsar binary or source distribution.
+   
+3. Change the `schemaRegistryStorageClassName` configuration in `broker.conf` to your custom factory class.
+      
+4. Start Pulsar.
diff --git a/site2/website-next/versioned_docs/version-2.4.1/schema-understand.md b/site2/website-next/versioned_docs/version-2.4.1/schema-understand.md
new file mode 100644
index 0000000..98114e3
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/schema-understand.md
@@ -0,0 +1,454 @@
+---
+id: schema-understand
+title: Understand schema
+sidebar_label: "Understand schema"
+original_id: schema-understand
+---
+
+This chapter explains the basic concepts of Pulsar schema, focuses on the topics of particular importance, and provides additional background.
+
+## SchemaInfo
+
+Pulsar schema is defined in a data structure called `SchemaInfo`. 
+
+The `SchemaInfo` is stored and enforced on a per-topic basis and cannot be stored at the namespace or tenant level.
+
+A `SchemaInfo` consists of the following fields:
+
+|  Field  |   Description  | 
+| --- | --- |
+|  `name`  |   Schema name (a string).  | 
+|  `type`  |   Schema type, which determines how to interpret the schema data. <li>Predefined schema: see [here](schema-understand.md#schema-type). </li><li>Customized schema: it is left as an empty string. </li> | 
+|  `schema`(`payload`)  |   Schema data, which is a sequence of 8-bit unsigned bytes and schema-type specific.  | 
+|  `properties`  |   It is a user defined properties as a string/string map. Applications can use this bag for carrying any application specific logics. Possible properties might be the Git hash associated with the schema, an environment string like `dev` or `prod`.  | 
+
+**Example**
+
+This is the `SchemaInfo` of a string.
+
+```text
+
+{
+    “name”: “test-string-schema”,
+    “type”: “STRING”,
+    “schema”: “”,
+    “properties”: {}
+}
+
+```
+
+## Schema type
+
+Pulsar supports various schema types, which are mainly divided into two categories: 
+
+* Primitive type 
+
+* Complex type
+
+### Primitive type
+
+Currently, Pulsar supports the following primitive types:
+
+| Primitive Type | Description |
+|---|---|
+| `BOOLEAN` | A binary value |
+| `INT8` | A 8-bit signed integer |
+| `INT16` | A 16-bit signed integer |
+| `INT32` | A 32-bit signed integer |
+| `INT64` | A 64-bit signed integer |
+| `FLOAT` | A single precision (32-bit) IEEE 754 floating-point number |
+| `DOUBLE` | A double-precision (64-bit) IEEE 754 floating-point number |
+| `BYTES` | A sequence of 8-bit unsigned bytes |
+| `STRING` | A Unicode character sequence |
+| `TIMESTAMP` (`DATE`, `TIME`) |  A logic type represents a specific instant in time with millisecond precision. <br />It stores the number of milliseconds since `January 1, 1970, 00:00:00 GMT` as an `INT64` value | 
+
+For primitive types, Pulsar does not store any schema data in `SchemaInfo`. The `type` in `SchemaInfo` is used to determine how to serialize and deserialize the data. 
+
+Some of the primitive schema implementations can use `properties` to store implementation-specific tunable settings. For example, a `string` schema can use `properties` to store the encoding charset to serialize and deserialize strings.
+
+The conversions between **Pulsar schema types** and **language-specific primitive types** are as below.
+
+| Schema Type | Java Type| Python Type | Go Type |
+|---|---|---|---|
+| BOOLEAN | boolean | bool | bool |
+| INT8 | byte | | int8 |
+| INT16 | short | | int16 |
+| INT32 | int | | int32 |
+| INT64 | long | | int64 |
+| FLOAT | float | float | float32 |
+| DOUBLE | double | float | float64|
+| BYTES | byte[], ByteBuffer, ByteBuf | bytes | []byte |
+| STRING | string | str | string| 
+| TIMESTAMP | java.sql.Timestamp | | |
+| TIME | java.sql.Time | | |
+| DATE | java.util.Date | | |
+
+**Example**
+
+This example demonstrates how to use a string schema.
+
+1. Create a producer with a string schema and send messages.
+
+   ```text
+   
+   Producer<String> producer = client.newProducer(Schema.STRING).create();
+   producer.newMessage().value("Hello Pulsar!").send();
+   
+   ```
+
+2. Create a consumer with a string schema and receive messages.  
+
+   ```text
+   
+   Consumer<String> consumer = client.newConsumer(Schema.STRING).create();
+   consumer.receive();
+   
+   ```
+
+### Complex type
+
+Currently, Pulsar supports the following complex types:
+
+| Complex Type | Description |
+|---|---|
+| `keyvalue` | Represents a complex type of a key/value pair. |
+| `struct` | Supports **AVRO**, **JSON**, and **Protobuf**. |
+
+#### keyvalue
+
+`Keyvalue` schema helps applications define schemas for both key and value. 
+
+For `SchemaInfo` of `keyvalue` schema, Pulsar stores the `SchemaInfo` of key schema and the `SchemaInfo` of value schema together.
+
+Pulsar provides two methods to encode a key/value pair in messages:
+
+* `INLINE`
+
+* `SEPARATED`
+
+Users can choose the encoding type when constructing the key/value schema.
+
+##### INLINE
+
+Key/value pairs will be encoded together in the message payload.
+
+##### SEPARATED
+
+Key will be encoded in the message key and the value will be encoded in the message payload. 
+  
+**Example**
+    
+This example shows how to construct a key/value schema and then use it to produce and consume messages.
+
+1. Construct a key/value schema with `INLINE` encoding type.
+
+   ```java
+   
+   Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
+   Schema.INT32,
+   Schema.STRING,
+   KeyValueEncodingType.INLINE
+   );
+   
+   ```
+
+2. Optionally, construct a key/value schema with `SEPARATED` encoding type.
+
+   ```java
+   
+   Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
+   Schema.INT32,
+   Schema.STRING,
+   KeyValueEncodingType.SEPARATED
+   );
+   
+   ```
+
+3. Produce messages using a key/value schema.
+
+   ```java
+   
+   Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
+   Schema.INT32,
+   Schema.STRING,
+   KeyValueEncodingType.SEPARATED
+   );
+
+   Producer<KeyValue<Integer, String>> producer = client.newProducer(kvSchema)
+       .topic(TOPIC)
+       .create();
+
+   final int key = 100;
+   final String value = "value-100”;
+
+   // send the key/value message
+   producer.newMessage()
+   .value(new KeyValue(key, value))
+   .send();
+   
+   ```
+
+4. Consume messages using a key/value schema.
+
+   ```java
+   
+   Schema<KeyValue<Integer, String>> kvSchema = Schema.KeyValue(
+   Schema.INT32,
+   Schema.STRING,
+   KeyValueEncodingType.SEPARATED
+   );
+
+   Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(kvSchema)
+       ...
+       .topic(TOPIC)
+       .subscriptionName(SubscriptionName).subscribe();
+
+   // receive key/value pair
+   Message<KeyValue<Integer, String>> msg = consumer.receive();
+   KeyValue<Integer, String> kv = msg.getValue();
+   
+   ```
+
+#### struct
+
+Pulsar uses [Avro Specification](http://avro.apache.org/docs/current/spec.html) to declare the schema definition for `struct` schema. 
+
+This allows Pulsar:
+
+* to use same tools to manage schema definitions
+
+* to use different serialization/deserialization methods to handle data
+
+There are two methods to use `struct` schema:
+
+* `static`
+
+* `generic`
+
+##### static
+
+You can predefine the `struct` schema, and it can be a POJO in Java, a `struct` in Go, or classes generated by Avro or Protobuf tools. 
+
+**Example** 
+
+Pulsar gets the schema definition from the predefined `struct` using an Avro library. The schema definition is the schema data stored as a part of the `SchemaInfo`.
+
+1. Create the _User_ class to define the messages sent to Pulsar topics.
+
+   ```text
+   
+   public class User {
+       String name;
+       int age;
+   }
+   
+   ```
+
+2. Create a producer with a `struct` schema and send messages.
+
+   ```
+   
+   Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
+   producer.newMessage().value(User.builder().userName("pulsar-user").userId(1L).build()).send();
+   
+   ```
+
+3. Create a consumer with a `struct` schema and receive messages
+
+   ```
+   
+   Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).create();
+   User user = consumer.receive();
+   
+   ```
+
+##### generic
+
+Sometimes applications do not have pre-defined structs, and you can use this method to define schema and access data.
+
+You can define the `struct` schema using the `GenericSchemaBuilder`, generate a generic struct using `GenericRecordBuilder` and consume messages into `GenericRecord`.
+
+**Example** 
+
+1. Use `RecordSchemaBuilder` to build a schema.
+
+   ```text
+   
+   RecordSchemaBuilder recordSchemaBuilder = SchemaBuilder.record("schemaName");
+   recordSchemaBuilder.field("intField").type(SchemaType.INT32);
+   SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO);
+
+   Producer<GenericRecord> producer = client.newProducer(Schema.generic(schemaInfo)).create();
+   
+   ```
+
+2. Use `RecordBuilder` to build the struct records.
+
+   ```text
+   
+   producer.newMessage().value(schema.newRecordBuilder()
+               .set("intField", 32)
+               .build()).send();
+   
+   ```
+
+### Auto Schema
+
+If you don't know the schema type of a Pulsar topic in advance, you can use AUTO schema to produce or consume generic records to or from brokers.
+
+| Auto Schema Type | Description |
+|---|---|
+| `AUTO_PRODUCE` | This is useful for transferring data **from a producer to a Pulsar topic that has a schema**. |
+| `AUTO_CONSUME` | This is useful for transferring data **from a Pulsar topic that has a schema to a consumer**. |
+
+#### AUTO_PRODUCE
+
+`AUTO_PRODUCE` schema helps a producer validate whether the bytes sent by the producer is compatible with the schema of a topic. 
+
+**Example**
+
+Suppose that:
+
+* You have a producer processing messages from a Kafka topic _K_. 
+
+* You have a Pulsar topic _P_, and you do not know its schema type.
+
+* Your application reads the messages from _K_ and writes the messages to _P_.  
+   
+In this case, you can use `AUTO_PRODUCE` to verify whether the bytes produced by _K_ can be sent to _P_ or not.
+
+```text
+
+Produce<byte[]> pulsarProducer = client.newProducer(Schema.AUTO_PRODUCE())
+    …
+    .create();
+
+byte[] kafkaMessageBytes = … ; 
+
+pulsarProducer.produce(kafkaMessageBytes);
+
+```
+
+#### AUTO_CONSUME
+
+`AUTO_CONSUME` schema helps a Pulsar topic validate whether the bytes sent by a Pulsar topic is compatible with a consumer, that is, the Pulsar topic deserializes messages into language-specific objects using the `SchemaInfo` retrieved from broker-side. 
+
+Currently, `AUTO_CONSUME` only supports **AVRO** and **JSON** schemas. It deserializes messages into `GenericRecord`.
+
+**Example**
+
+Suppose that:
+
+* You have a Pulsar topic _P_.
+
+* You have a consumer (for example, MySQL) receiving messages from the topic _P_.
+
+* You application reads the messages from _P_ and writes the messages to MySQL.
+   
+In this case, you can use `AUTO_CONSUME` to verify whether the bytes produced by _P_ can be sent to MySQL or not.
+
+```text
+
+Consumer<GenericRecord> pulsarConsumer = client.newConsumer(Schema.AUTO_CONSUME())
+    …
+    .subscribe();
+
+Message<GenericRecord> msg = consumer.receive() ; 
+GenericRecord record = msg.getValue();
+…
+
+```
+
+## Schema version
+
+Each `SchemaInfo` stored with a topic has a version. Schema version manages schema changes happening within a topic. 
+
+Messages produced with a given `SchemaInfo` is tagged with a schema version, so when a message is consumed by a Pulsar client, the Pulsar client can use the schema version to retrieve the corresponding `SchemaInfo` and then use the `SchemaInfo` to deserialize data.
+
+Schemas are versioned in succession. Schema storage happens in a broker that handles the associated topics so that version assignments can be made. 
+
+Once a version is assigned/fetched to/for a schema, all subsequent messages produced by that producer are tagged with the appropriate version.
+
+**Example**
+
+The following example illustrates how the schema version works.
+
+Suppose that a Pulsar [Java client](client-libraries-java) created using the code below attempts to connect to Pulsar and begins to send messages:
+
+```text
+
+PulsarClient client = PulsarClient.builder()
+        .serviceUrl("pulsar://localhost:6650")
+        .build();
+
+Producer<SensorReading> producer = client.newProducer(JSONSchema.of(SensorReading.class))
+        .topic("sensor-data")
+        .sendTimeout(3, TimeUnit.SECONDS)
+        .create();
+
+```
+
+The table below lists the possible scenarios when this connection attempt occurs and what happens in each scenario:
+
+| Scenario |  What happens | 
+| --- | --- |
+|  <li>No schema exists for the topic. </li> |   (1) The producer is created using the given schema. (2) Since no existing schema is compatible with the `SensorReading` schema, the schema is transmitted to the broker and stored. (3) Any consumer created using the same schema or topic can consume messages from the `sensor-data` topic.  | 
+|  <li>A schema already exists. </li><li>The producer connects using the same schema that is already stored. </li> |   (1) The schema is transmitted to the broker. (2) The broker determines that the schema is compatible. (3) The broker attempts to store the schema in [BookKeeper](concepts-architecture-overview.md#persistent-storage) but then determines that it's already stored, so it is used to tag produced messages.  |   <li>A schema already exists. </li><li>The producer connects using  [...]
+
+## How does schema work
+
+Pulsar schemas are applied and enforced at the **topic** level (schemas cannot be applied at the namespace or tenant level). 
+
+Producers and consumers upload schemas to brokers, so Pulsar schemas work on the producer side and the consumer side.
+
+### Producer side
+
+This diagram illustrates how does schema work on the Producer side.
+
+![Schema works at the producer side](/assets/schema-producer.png)
+
+1. The application uses a schema instance to construct a producer instance. 
+
+   The schema instance defines the schema for the data being produced using the producer instance. 
+
+   Take AVRO as an example, Pulsar extract schema definition from the POJO class and construct the `SchemaInfo` that the producer needs to pass to a broker when it connects.
+
+2. The producer connects to the broker with the `SchemaInfo` extracted from the passed-in schema instance.
+   
+3. The broker looks up the schema in the schema storage to check if it is already a registered schema. 
+   
+4. If yes, the broker skips the schema validation since it is a known schema, and returns the schema version to the producer.
+  
+5. If no, the broker validates the schema based on the schema compatibility check strategy defined for the topic. 
+  
+6. If the schema is compatible, the broker stores it and returns the schema version to the producer. 
+
+   All the messages produced by this producer are tagged with the schema version. 
+
+7. If the schema is incompatible, the broker rejects it.
+
+### Consumer side
+
+This diagram illustrates how does Schema work on the consumer side. 
+
+![Schema works at the consumer side](/assets/schema-consumer.png)
+
+1. The application uses a schema instance to construct a consumer instance.
+   
+   The schema instance defines the schema that the consumer uses for decoding messages received from a broker.
+
+2. The consumer connects to the broker with the `SchemaInfo` extracted from the passed-in schema instance.
+   
+3. The broker looks up the schema in the schema storage to check if it is already a registered schema. 
+   
+4. If yes, the broker skips the schema validation since it is a known schema, and returns the schema version to the consumer.
+
+5. If no, the broker validates the schema based on the schema compatibility check strategy defined for the topic. 
+   
+6. If the schema is compatible, the broker stores it and returns the schema version to the consumer. 
+   
+7. If the schema is incompatible, the consumer will be disconnected.
+
+8. The consumer receives the messages from the broker. 
+
+   If the schema used by the consumer supports schema versioning (for example, AVRO schema), the consumer fetches the  `SchemaInfo` of the version tagged in messages, and use the passed-in schema and the schema tagged in messages to decode the messages.
diff --git a/site2/website-next/versioned_docs/version-2.4.1/standalone-docker.md b/site2/website-next/versioned_docs/version-2.4.1/standalone-docker.md
new file mode 100644
index 0000000..05ac2a1
--- /dev/null
+++ b/site2/website-next/versioned_docs/version-2.4.1/standalone-docker.md
@@ -0,0 +1,179 @@
+---
+id: standalone-docker
+title: Set up a standalone Pulsar in Docker
+sidebar_label: "Run Pulsar in Docker"
+original_id: standalone-docker
+---
+
+For local development and testing, you can run Pulsar in standalone
+mode on your own machine within a Docker container.
+
+If you have not installed Docker, download the [Community edition](https://www.docker.com/community-edition)
+and follow the instructions for your OS.
+
+## Start Pulsar in Docker
+
+* For MacOS, Linux, and Windows:
+
+  ```shell
+  
+  $ docker run -it \
+  -p 6650:6650 \
+  -p 8080:8080 \
+  --mount source=pulsardata,target=/pulsar/data \
+  --mount source=pulsarconf,target=/pulsar/conf \
+  apachepulsar/pulsar:@pulsar:version@ \
+  bin/pulsar standalone
+  
+  ```
+
+A few things to note about this command:
+ * The data, metadata, and configuration are persisted on Docker volumes in order to not start "fresh" every 
+time the container is restarted. For details on the volumes you can use `docker volume inspect <sourcename>`
+ * For Docker on Windows make sure to configure it to use Linux containers
+
+If you start Pulsar successfully, you will see `INFO`-level log messages like this:
+
+```
+
+2017-08-09 22:34:04,030 - INFO  - [main:WebService@213] - Web Service started at http://127.0.0.1:8080
+2017-08-09 22:34:04,038 - INFO  - [main:PulsarService@335] - messaging service is ready, bootstrap service on port=8080, broker url=pulsar://127.0.0.1:6650, cluster=standalone, configs=org.apache.pulsar.broker.ServiceConfiguration@4db60246
+...
+
+```
+
+:::tip
+
+When you start a local standalone cluster, a `public/default`
+
+:::
+
+namespace is created automatically. The namespace is used for development purposes. All Pulsar topics are managed within namespaces.
+For more information, see [Topics](concepts-messaging.md#topics).
+
+## Use Pulsar in Docker
+
+Pulsar offers client libraries for [Java](client-libraries-java.md), [Go](client-libraries-go.md), [Python](client-libraries-python) 
+and [C++](client-libraries-cpp). If you're running a local standalone cluster, you can
+use one of these root URLs to interact with your cluster:
+
+* `pulsar://localhost:6650`
+* `http://localhost:8080`
+
+The following example will guide you get started with Pulsar quickly by using the [Python](client-libraries-python)
+client API.
+
+Install the Pulsar Python client library directly from [PyPI](https://pypi.org/project/pulsar-client/):
+
+```shell
+
+$ pip install pulsar-client
+
+```
+
+### Consume a message
+
+Create a consumer and subscribe to the topic:
+
+```python
+
+import pulsar
+
+client = pulsar.Client('pulsar://localhost:6650')
+consumer = client.subscribe('my-topic',
+                            subscription_name='my-sub')
+
+while True:
+    msg = consumer.receive()
+    print("Received message: '%s'" % msg.data())
+    consumer.acknowledge(msg)
+
+client.close()
+
+```
+
+### Produce a message
+
+Now start a producer to send some test messages:
+
+```python
+
+import pulsar
+
+client = pulsar.Client('pulsar://localhost:6650')
+producer = client.create_producer('my-topic')
+
+for i in range(10):
+    producer.send(('hello-pulsar-%d' % i).encode('utf-8'))
+
+client.close()
+
+```
+
+## Get the topic statistics
+
+In Pulsar, you can use REST, Java, or command-line tools to control every aspect of the system.
+For details on APIs, refer to [Admin API Overview](admin-api-overview).
+
+In the simplest example, you can use curl to probe the stats for a particular topic:
+
+```shell
+
+$ curl http://localhost:8080/admin/v2/persistent/public/default/my-topic/stats | python -m json.tool
+
+```
+
+The output is something like this:
+
+```json
+
+{
+  "averageMsgSize": 0.0,
+  "msgRateIn": 0.0,
+  "msgRateOut": 0.0,
+  "msgThroughputIn": 0.0,
+  "msgThroughputOut": 0.0,
+  "publishers": [
+    {
+      "address": "/172.17.0.1:35048",
+      "averageMsgSize": 0.0,
+      "clientVersion": "1.19.0-incubating",
+      "connectedSince": "2017-08-09 20:59:34.621+0000",
+      "msgRateIn": 0.0,
+      "msgThroughputIn": 0.0,
+      "producerId": 0,
+      "producerName": "standalone-0-1"
+    }
+  ],
+  "replication": {},
+  "storageSize": 16,
+  "subscriptions": {
+    "my-sub": {
+      "blockedSubscriptionOnUnackedMsgs": false,
+      "consumers": [
+        {
+          "address": "/172.17.0.1:35064",
+          "availablePermits": 996,
+          "blockedConsumerOnUnackedMsgs": false,
+          "clientVersion": "1.19.0-incubating",
+          "connectedSince": "2017-08-09 21:05:39.222+0000",
+          "consumerName": "166111",
+          "msgRateOut": 0.0,
+          "msgRateRedeliver": 0.0,
+          "msgThroughputOut": 0.0,
+          "unackedMessages": 0
+        }
+      ],
+      "msgBacklog": 0,
+      "msgRateExpired": 0.0,
+      "msgRateOut": 0.0,
+      "msgRateRedeliver": 0.0,
+      "msgThroughputOut": 0.0,
+      "type": "Exclusive",
+      "unackedMessages": 0
+    }
+  }
+}
+
+```
+
diff --git a/site2/website-next/versioned_docs/version-2.7.2/developing-binary-protocol.md b/site2/website-next/versioned_docs/version-2.7.2/developing-binary-protocol.md
deleted file mode 100644
index b233f10..0000000
--- a/site2/website-next/versioned_docs/version-2.7.2/developing-binary-protocol.md
+++ /dev/null
@@ -1,581 +0,0 @@
----
-id: develop-binary-protocol
-title: Pulsar binary protocol specification
-sidebar_label: "Binary protocol"
-original_id: develop-binary-protocol
----
-
-Pulsar uses a custom binary protocol for communications between producers/consumers and brokers. This protocol is designed to support required features, such as acknowledgements and flow control, while ensuring maximum transport and implementation efficiency.
-
-Clients and brokers exchange *commands* with each other. Commands are formatted as binary [protocol buffer](https://developers.google.com/protocol-buffers/) (aka *protobuf*) messages. The format of protobuf commands is specified in the [`PulsarApi.proto`](https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/proto/PulsarApi.proto) file and also documented in the [Protobuf interface](#protobuf-interface) section below.
-
-> ### Connection sharing
-> Commands for different producers and consumers can be interleaved and sent through the same connection without restriction.
-
-All commands associated with Pulsar's protocol are contained in a [`BaseCommand`](#pulsar.proto.BaseCommand) protobuf message that includes a [`Type`](#pulsar.proto.Type) [enum](https://developers.google.com/protocol-buffers/docs/proto#enum) with all possible subcommands as optional fields. `BaseCommand` messages can specify only one subcommand.
-
-## Framing
-
-Since protobuf doesn't provide any sort of message frame, all messages in the Pulsar protocol are prepended with a 4-byte field that specifies the size of the frame. The maximum allowable size of a single frame is 5 MB.
-
-The Pulsar protocol allows for two types of commands:
-
-1. **Simple commands** that do not carry a message payload.
-2. **Payload commands** that bear a payload that is used when publishing or delivering messages. In payload commands, the protobuf command data is followed by protobuf [metadata](#message-metadata) and then the payload, which is passed in raw format outside of protobuf. All sizes are passed as 4-byte unsigned big endian integers.
-
-> Message payloads are passed in raw format rather than protobuf format for efficiency reasons.
-
-### Simple commands
-
-Simple (payload-free) commands have this basic structure:
-
-| Component   | Description                                                                             | Size (in bytes) |
-|:------------|:----------------------------------------------------------------------------------------|:----------------|
-| totalSize   | The size of the frame, counting everything that comes after it (in bytes)               | 4               |
-| commandSize | The size of the protobuf-serialized command                                             | 4               |
-| message     | The protobuf message serialized in a raw binary format (rather than in protobuf format) |                 |
-
-### Payload commands
-
-Payload commands have this basic structure:
-
-| Component    | Description                                                                                 | Size (in bytes) |
-|:-------------|:--------------------------------------------------------------------------------------------|:----------------|
-| totalSize    | The size of the frame, counting everything that comes after it (in bytes)                   | 4               |
-| commandSize  | The size of the protobuf-serialized command                                                 | 4               |
-| message      | The protobuf message serialized in a raw binary format (rather than in protobuf format)     |                 |
-| magicNumber  | A 2-byte byte array (`0x0e01`) identifying the current format                               | 2               |
-| checksum     | A [CRC32-C checksum](http://www.evanjones.ca/crc32c.html) of everything that comes after it | 4               |
-| metadataSize | The size of the message [metadata](#message-metadata)                                       | 4               |
-| metadata     | The message [metadata](#message-metadata) stored as a binary protobuf message               |                 |
-| payload      | Anything left in the frame is considered the payload and can include any sequence of bytes  |                 |
-
-## Message metadata
-
-Message metadata is stored alongside the application-specified payload as a serialized protobuf message. Metadata is created by the producer and passed on unchanged to the consumer.
-
-| Field                                | Description                                                                                                                                                                                                                                               |
-|:-------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `producer_name`                      | The name of the producer that published the message                                                                                                                                                                                         |
-| `sequence_id`                        | The sequence ID of the message, assigned by producer                                                                                                                                                                                        |
-| `publish_time`                       | The publish timestamp in Unix time (i.e. as the number of milliseconds since January 1st, 1970 in UTC)                                                                                                                                                    |
-| `properties`                         | A sequence of key/value pairs (using the [`KeyValue`](https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/proto/PulsarApi.proto#L32) message). These are application-defined keys and values with no special meaning to Pulsar. |
-| `replicated_from` *(optional)*       | Indicates that the message has been replicated and specifies the name of the [cluster](reference-terminology.md#cluster) where the message was originally published                                                                                                             |
-| `partition_key` *(optional)*         | While publishing on a partition topic, if the key is present, the hash of the key is used to determine which partition to choose                                                                                                                          |
-| `compression` *(optional)*           | Signals that payload has been compressed and with which compression library                                                                                                                                                                               |
-| `uncompressed_size` *(optional)*     | If compression is used, the producer must fill the uncompressed size field with the original payload size                                                                                                                                                 |
-| `num_messages_in_batch` *(optional)* | If this message is really a [batch](#batch-messages) of multiple entries, this field must be set to the number of messages in the batch                                                                                                                   |
-
-### Batch messages
-
-When using batch messages, the payload will be containing a list of entries,
-each of them with its individual metadata, defined by the `SingleMessageMetadata`
-object.
-
-
-For a single batch, the payload format will look like this:
-
-
-| Field         | Description                                                 |
-|:--------------|:------------------------------------------------------------|
-| metadataSizeN | The size of the single message metadata serialized Protobuf |
-| metadataN     | Single message metadata                                     |
-| payloadN      | Message payload passed by application                       |
-
-Each metadata field looks like this;
-
-| Field                      | Description                                             |
-|:---------------------------|:--------------------------------------------------------|
-| properties                 | Application-defined properties                          |
-| partition key *(optional)* | Key to indicate the hashing to a particular partition   |
-| payload_size               | Size of the payload for the single message in the batch |
-
-When compression is enabled, the whole batch will be compressed at once.
-
-## Interactions
-
-### Connection establishment
-
-After opening a TCP connection to a broker, typically on port 6650, the client
-is responsible to initiate the session.
-
-![Connect interaction](/assets/binary-protocol-connect.png)
-
-After receiving a `Connected` response from the broker, the client can
-consider the connection ready to use. Alternatively, if the broker doesn't
-validate the client authentication, it will reply with an `Error` command and
-close the TCP connection.
-
-Example:
-
-```protobuf
-
-message CommandConnect {
-  "client_version" : "Pulsar-Client-Java-v1.15.2",
-  "auth_method_name" : "my-authentication-plugin",
-  "auth_data" : "my-auth-data",
-  "protocol_version" : 6
-}
-
-```
-
-Fields:
- * `client_version` → String based identifier. Format is not enforced
- * `auth_method_name` → *(optional)* Name of the authentication plugin if auth
-   enabled
- * `auth_data` → *(optional)* Plugin specific authentication data
- * `protocol_version` → Indicates the protocol version supported by the
-   client. Broker will not send commands introduced in newer revisions of the
-   protocol. Broker might be enforcing a minimum version
-
-```protobuf
-
-message CommandConnected {
-  "server_version" : "Pulsar-Broker-v1.15.2",
-  "protocol_version" : 6
-}
-
-```
-
-Fields:
- * `server_version` → String identifier of broker version
- * `protocol_version` → Protocol version supported by the broker. Client
-   must not attempt to send commands introduced in newer revisions of the
-   protocol
-
-### Keep Alive
-
-To identify prolonged network partitions between clients and brokers or cases
-in which a machine crashes without interrupting the TCP connection on the remote
-end (eg: power outage, kernel panic, hard reboot...), we have introduced a
-mechanism to probe for the availability status of the remote peer.
-
-Both clients and brokers are sending `Ping` commands periodically and they will
-close the socket if a `Pong` response is not received within a timeout (default
-used by broker is 60s).
-
-A valid implementation of a Pulsar client is not required to send the `Ping`
-probe, though it is required to promptly reply after receiving one from the
-broker in order to prevent the remote side from forcibly closing the TCP connection.
-
-
-### Producer
-
-In order to send messages, a client needs to establish a producer. When creating
-a producer, the broker will first verify that this particular client is
-authorized to publish on the topic.
-
-Once the client gets confirmation of the producer creation, it can publish
-messages to the broker, referring to the producer id negotiated before.
-
-![Producer interaction](/assets/binary-protocol-producer.png)
-
-##### Command Producer
-
-```protobuf
-
-message CommandProducer {
-  "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
-  "producer_id" : 1,
-  "request_id" : 1
-}
-
-```
-
-Parameters:
- * `topic` → Complete topic name to where you want to create the producer on
- * `producer_id` → Client generated producer identifier. Needs to be unique
-    within the same connection
- * `request_id` → Identifier for this request. Used to match the response with
-    the originating request. Needs to be unique within the same connection
- * `producer_name` → *(optional)* If a producer name is specified, the name will
-    be used, otherwise the broker will generate a unique name. Generated
-    producer name is guaranteed to be globally unique. Implementations are
-    expected to let the broker generate a new producer name when the producer
-    is initially created, then reuse it when recreating the producer after
-    reconnections.
-
-The broker will reply with either `ProducerSuccess` or `Error` commands.
-
-##### Command ProducerSuccess
-
-```protobuf
-
-message CommandProducerSuccess {
-  "request_id" :  1,
-  "producer_name" : "generated-unique-producer-name"
-}
-
-```
-
-Parameters:
- * `request_id` → Original id of the `CreateProducer` request
- * `producer_name` → Generated globally unique producer name or the name
-    specified by the client, if any.
-
-##### Command Send
-
-Command `Send` is used to publish a new message within the context of an
-already existing producer. This command is used in a frame that includes command
-as well as message payload, for which the complete format is specified in the [payload commands](#payload-commands) section.
-
-```protobuf
-
-message CommandSend {
-  "producer_id" : 1,
-  "sequence_id" : 0,
-  "num_messages" : 1
-}
-
-```
-
-Parameters:
- * `producer_id` → id of an existing producer
- * `sequence_id` → each message has an associated sequence id which is expected
-   to be implemented with a counter starting at 0. The `SendReceipt` that
-   acknowledges the effective publishing of a messages will refer to it by
-   its sequence id.
- * `num_messages` → *(optional)* Used when publishing a batch of messages at
-   once.
-
-##### Command SendReceipt
-
-After a message has been persisted on the configured number of replicas, the
-broker will send the acknowledgment receipt to the producer.
-
-```protobuf
-
-message CommandSendReceipt {
-  "producer_id" : 1,
-  "sequence_id" : 0,
-  "message_id" : {
-    "ledgerId" : 123,
-    "entryId" : 456
-  }
-}
-
-```
-
-Parameters:
- * `producer_id` → id of producer originating the send request
- * `sequence_id` → sequence id of the published message
- * `message_id` → message id assigned by the system to the published message
-   Unique within a single cluster. Message id is composed of 2 longs, `ledgerId`
-   and `entryId`, that reflect that this unique id is assigned when appending
-   to a BookKeeper ledger
-
-
-##### Command CloseProducer
-
-**Note**: *This command can be sent by either producer or broker*.
-
-When receiving a `CloseProducer` command, the broker will stop accepting any
-more messages for the producer, wait until all pending messages are persisted
-and then reply `Success` to the client.
-
-The broker can send a `CloseProducer` command to client when it's performing
-a graceful failover (eg: broker is being restarted, or the topic is being unloaded
-by load balancer to be transferred to a different broker).
-
-When receiving the `CloseProducer`, the client is expected to go through the
-service discovery lookup again and recreate the producer again. The TCP
-connection is not affected.
-
-### Consumer
-
-A consumer is used to attach to a subscription and consume messages from it.
-After every reconnection, a client needs to subscribe to the topic. If a
-subscription is not already there, a new one will be created.
-
-![Consumer](/assets/binary-protocol-consumer.png)
-
-#### Flow control
-
-After the consumer is ready, the client needs to *give permission* to the
-broker to push messages. This is done with the `Flow` command.
-
-A `Flow` command gives additional *permits* to send messages to the consumer.
-A typical consumer implementation will use a queue to accumulate these messages
-before the application is ready to consume them.
-
-After the application has dequeued half of the messages in the queue, the consumer 
-sends permits to the broker to ask for more messages (equals to half of the messages in the queue).
-
-For example, if the queue size is 1000 and the consumer consumes 500 messages in the queue.
-Then the consumer sends permits to the broker to ask for 500 messages.
-
-##### Command Subscribe
-
-```protobuf
-
-message CommandSubscribe {
-  "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
-  "subscription" : "my-subscription-name",
-  "subType" : "Exclusive",
-  "consumer_id" : 1,
-  "request_id" : 1
-}
-
-```
-
-Parameters:
- * `topic` → Complete topic name to where you want to create the consumer on
- * `subscription` → Subscription name
- * `subType` → Subscription type: Exclusive, Shared, Failover, Key_Shared
- * `consumer_id` → Client generated consumer identifier. Needs to be unique
-    within the same connection
- * `request_id` → Identifier for this request. Used to match the response with
-    the originating request. Needs to be unique within the same connection
- * `consumer_name` → *(optional)* Clients can specify a consumer name. This
-    name can be used to track a particular consumer in the stats. Also, in
-    Failover subscription type, the name is used to decide which consumer is
-    elected as *master* (the one receiving messages): consumers are sorted by
-    their consumer name and the first one is elected master.
-
-##### Command Flow
-
-```protobuf
-
-message CommandFlow {
-  "consumer_id" : 1,
-  "messagePermits" : 1000
-}
-
-```
-
-Parameters:
-* `consumer_id` → Id of an already established consumer
-* `messagePermits` → Number of additional permits to grant to the broker for
-  pushing more messages
-
-##### Command Message
-
-Command `Message` is used by the broker to push messages to an existing consumer,
-within the limits of the given permits.
-
-
-This command is used in a frame that includes the message payload as well, for
-which the complete format is specified in the [payload commands](#payload-commands)
-section.
-
-```protobuf
-
-message CommandMessage {
-  "consumer_id" : 1,
-  "message_id" : {
-    "ledgerId" : 123,
-    "entryId" : 456
-  }
-}
-
-```
-
-##### Command Ack
-
-An `Ack` is used to signal to the broker that a given message has been
-successfully processed by the application and can be discarded by the broker.
-
-In addition, the broker will also maintain the consumer position based on the
-acknowledged messages.
-
-```protobuf
-
-message CommandAck {
-  "consumer_id" : 1,
-  "ack_type" : "Individual",
-  "message_id" : {
-    "ledgerId" : 123,
-    "entryId" : 456
-  }
-}
-
-```
-
-Parameters:
- * `consumer_id` → Id of an already established consumer
- * `ack_type` → Type of acknowledgment: `Individual` or `Cumulative`
- * `message_id` → Id of the message to acknowledge
- * `validation_error` → *(optional)* Indicates that the consumer has discarded
-   the messages due to: `UncompressedSizeCorruption`,
-   `DecompressionError`, `ChecksumMismatch`, `BatchDeSerializeError`
-
-##### Command CloseConsumer
-
-***Note***: **This command can be sent by either producer or broker*.
-
-This command behaves the same as [`CloseProducer`](#command-closeproducer)
-
-##### Command RedeliverUnacknowledgedMessages
-
-A consumer can ask the broker to redeliver some or all of the pending messages
-that were pushed to that particular consumer and not yet acknowledged.
-
-The protobuf object accepts a list of message ids that the consumer wants to
-be redelivered. If the list is empty, the broker will redeliver all the
-pending messages.
-
-On redelivery, messages can be sent to the same consumer or, in the case of a
-shared subscription, spread across all available consumers.
-
-
-##### Command ReachedEndOfTopic
-
-This is sent by a broker to a particular consumer, whenever the topic
-has been "terminated" and all the messages on the subscription were
-acknowledged.
-
-The client should use this command to notify the application that no more
-messages are coming from the consumer.
-
-##### Command ConsumerStats
-
-This command is sent by the client to retrieve Subscriber and Consumer level 
-stats from the broker.
-Parameters:
- * `request_id` → Id of the request, used to correlate the request 
-      and the response.
- * `consumer_id` → Id of an already established consumer.
-
-##### Command ConsumerStatsResponse
-
-This is the broker's response to ConsumerStats request by the client. 
-It contains the Subscriber and Consumer level stats of the `consumer_id` sent in the request.
-If the `error_code` or the `error_message` field is set it indicates that the request has failed.
-
-##### Command Unsubscribe
-
-This command is sent by the client to unsubscribe the `consumer_id` from the associated topic.
-Parameters:
- * `request_id` → Id of the request.
- * `consumer_id` → Id of an already established consumer which needs to unsubscribe.
-
-
-## Service discovery
-
-### Topic lookup
-
-Topic lookup needs to be performed each time a client needs to create or
-reconnect a producer or a consumer. Lookup is used to discover which particular
-broker is serving the topic we are about to use.
-
-Lookup can be done with a REST call as described in the [admin API](admin-api-topics.md#lookup-of-topic)
-docs.
-
-Since Pulsar-1.16 it is also possible to perform the lookup within the binary
-protocol.
-
-For the sake of example, let's assume we have a service discovery component
-running at `pulsar://broker.example.com:6650`
-
-Individual brokers will be running at `pulsar://broker-1.example.com:6650`,
-`pulsar://broker-2.example.com:6650`, ...
-
-A client can use a connection to the discovery service host to issue a
-`LookupTopic` command. The response can either be a broker hostname to
-connect to, or a broker hostname to which retry the lookup.
-
-The `LookupTopic` command has to be used in a connection that has already
-gone through the `Connect` / `Connected` initial handshake.
-
-![Topic lookup](/assets/binary-protocol-topic-lookup.png)
-
-```protobuf
-
-message CommandLookupTopic {
-  "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
-  "request_id" : 1,
-  "authoritative" : false
-}
-
-```
-
-Fields:
- * `topic` → Topic name to lookup
- * `request_id` → Id of the request that will be passed with its response
- * `authoritative` → Initial lookup request should use false. When following a
-   redirect response, client should pass the same value contained in the
-   response
-
-##### LookupTopicResponse
-
-Example of response with successful lookup:
-
-```protobuf
-
-message CommandLookupTopicResponse {
-  "request_id" : 1,
-  "response" : "Connect",
-  "brokerServiceUrl" : "pulsar://broker-1.example.com:6650",
-  "brokerServiceUrlTls" : "pulsar+ssl://broker-1.example.com:6651",
-  "authoritative" : true
-}
-
-```
-
-Example of lookup response with redirection:
-
-```protobuf
-
-message CommandLookupTopicResponse {
-  "request_id" : 1,
-  "response" : "Redirect",
-  "brokerServiceUrl" : "pulsar://broker-2.example.com:6650",
-  "brokerServiceUrlTls" : "pulsar+ssl://broker-2.example.com:6651",
-  "authoritative" : true
-}
-
-```
-
-In this second case, we need to reissue the `LookupTopic` command request
-to `broker-2.example.com` and this broker will be able to give a definitive
-answer to the lookup request.
-
-### Partitioned topics discovery
-
-Partitioned topics metadata discovery is used to find out if a topic is a
-"partitioned topic" and how many partitions were set up.
-
-If the topic is marked as "partitioned", the client is expected to create
-multiple producers or consumers, one for each partition, using the `partition-X`
-suffix.
-
-This information only needs to be retrieved the first time a producer or
-consumer is created. There is no need to do this after reconnections.
-
-The discovery of partitioned topics metadata works very similar to the topic
-lookup. The client send a request to the service discovery address and the
-response will contain actual metadata.
-
-##### Command PartitionedTopicMetadata
-
-```protobuf
-
-message CommandPartitionedTopicMetadata {
-  "topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
-  "request_id" : 1
-}
-
-```
-
-Fields:
- * `topic` → the topic for which to check the partitions metadata
- * `request_id` → Id of the request that will be passed with its response
-
-
-##### Command PartitionedTopicMetadataResponse
-
-Example of response with metadata:
-
-```protobuf
-
-message CommandPartitionedTopicMetadataResponse {
-  "request_id" : 1,
-  "response" : "Success",
-  "partitions" : 32
-}
-
-```
-
-## Protobuf interface
-
-All Pulsar's Protobuf definitions can be found {@inject: github:here:/pulsar-common/src/main/proto/PulsarApi.proto}.
diff --git a/site2/website-next/versioned_docs/version-2.7.2/developing-cpp.md b/site2/website-next/versioned_docs/version-2.7.2/developing-cpp.md
deleted file mode 100644
index 9da7a3a..0000000
--- a/site2/website-next/versioned_docs/version-2.7.2/developing-cpp.md
+++ /dev/null
@@ -1,114 +0,0 @@
----
-id: develop-cpp
-title: Building Pulsar C++ client
-sidebar_label: "Building Pulsar C++ client"
-original_id: develop-cpp
----
-
-## Supported platforms
-
-The Pulsar C++ client has been successfully tested on **MacOS** and **Linux**.
-
-## System requirements
-
-You need to have the following installed to use the C++ client:
-
-* [CMake](https://cmake.org/)
-* [Boost](http://www.boost.org/)
-* [Protocol Buffers](https://developers.google.com/protocol-buffers/) 2.6
-* [Log4CXX](https://logging.apache.org/log4cxx)
-* [libcurl](https://curl.haxx.se/libcurl/)
-* [Google Test](https://github.com/google/googletest)
-* [JsonCpp](https://github.com/open-source-parsers/jsoncpp)
-
-## Compilation
-
-There are separate compilation instructions for [MacOS](#macos) and [Linux](#linux). For both systems, start by cloning the Pulsar repository:
-
-```shell
-
-$ git clone https://github.com/apache/pulsar
-
-```
-
-### Linux
-
-First, install all of the necessary dependencies:
-
-```shell
-
-$ apt-get install cmake libssl-dev libcurl4-openssl-dev liblog4cxx-dev \
-  libprotobuf-dev protobuf-compiler libboost-all-dev google-mock libgtest-dev libjsoncpp-dev
-
-```
-
-Then compile and install [Google Test](https://github.com/google/googletest):
-
-```shell
-
-# libgtest-dev version is 1.18.0 or above
-$ cd /usr/src/googletest
-$ sudo cmake .
-$ sudo make
-$ sudo cp ./googlemock/libgmock.a ./googlemock/gtest/libgtest.a /usr/lib/
-
-# less than 1.18.0
-$ cd /usr/src/gtest
-$ sudo cmake .
-$ sudo make
-$ sudo cp libgtest.a /usr/lib
-
-$ cd /usr/src/gmock
-$ sudo cmake .
-$ sudo make
-$ sudo cp libgmock.a /usr/lib
-
-```
-
-Finally, compile the Pulsar client library for C++ inside the Pulsar repo:
-
-```shell
-
-$ cd pulsar-client-cpp
-$ cmake .
-$ make
-
-```
-
-The resulting files, `libpulsar.so` and `libpulsar.a`, will be placed in the `lib` folder of the repo while two tools, `perfProducer` and `perfConsumer`, will be placed in the `perf` directory.
-
-### MacOS
-
-First, install all of the necessary dependencies:
-
-```shell
-
-# OpenSSL installation
-$ brew install openssl
-$ export OPENSSL_INCLUDE_DIR=/usr/local/opt/openssl/include/
-$ export OPENSSL_ROOT_DIR=/usr/local/opt/openssl/
-
-# Protocol Buffers installation
-$ brew tap homebrew/versions
-$ brew install protobuf260
-$ brew install boost
-$ brew install log4cxx
-
-# Google Test installation
-$ git clone https://github.com/google/googletest.git
-$ cd googletest
-$ cmake .
-$ make install
-
-```
-
-Then compile the Pulsar client library in the repo that you cloned:
-
-```shell
-
-$ cd pulsar-client-cpp
-$ cmake .
-$ make
-
-```
-
diff --git a/site2/website-next/versioned_docs/version-2.7.2/developing-load-manager.md b/site2/website-next/versioned_docs/version-2.7.2/developing-load-manager.md
deleted file mode 100644
index 509209b..0000000
--- a/site2/website-next/versioned_docs/version-2.7.2/developing-load-manager.md
+++ /dev/null
@@ -1,227 +0,0 @@
----
-id: develop-load-manager
-title: Modular load manager
-sidebar_label: "Modular load manager"
-original_id: develop-load-manager
----
-
-The *modular load manager*, implemented in  [`ModularLoadManagerImpl`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java), is a flexible alternative to the previously implemented load manager, [`SimpleLoadManagerImpl`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java), which attempts to simplify how load  [...]
-
-## Usage
-
-There are two ways that you can enable the modular load manager:
-
-1. Change the value of the `loadManagerClassName` parameter in `conf/broker.conf` from `org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl` to `org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl`.
-2. Using the `pulsar-admin` tool. Here's an example:
-
-   ```shell
-   
-   $ pulsar-admin update-dynamic-config \
-    --config loadManagerClassName \
-    --value org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl
-   
-   ```
-
-   You can use the same method to change back to the original value. In either case, any mistake in specifying the load manager will cause Pulsar to default to `SimpleLoadManagerImpl`.
-
-## Verification
-
-There are a few different ways to determine which load manager is being used:
-
-1. Use `pulsar-admin` to examine the `loadManagerClassName` element:
-
-   ```shell
-   
-   $ bin/pulsar-admin brokers get-all-dynamic-config
-   {
-    "loadManagerClassName" : "org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl"
-   }
-   
-   ```
-
-   If there is no `loadManagerClassName` element, then the default load manager is used.
-
-2. Consult a ZooKeeper load report. With the module load manager, the load report in `/loadbalance/brokers/...` will have many differences. for example the `systemResourceUsage` sub-elements (`bandwidthIn`, `bandwidthOut`, etc.) are now all at the top level. Here is an example load report from the module load manager:
-
-   ```json
-   
-   {
-     "bandwidthIn": {
-       "limit": 10240000.0,
-       "usage": 4.256510416666667
-     },
-     "bandwidthOut": {
-       "limit": 10240000.0,
-       "usage": 5.287239583333333
-     },
-     "bundles": [],
-     "cpu": {
-       "limit": 2400.0,
-       "usage": 5.7353247655435915
-     },
-     "directMemory": {
-       "limit": 16384.0,
-       "usage": 1.0
-     }
-   }
-   
-   ```
-
-   With the simple load manager, the load report in `/loadbalance/brokers/...` will look like this:
-
-   ```json
-   
-   {
-     "systemResourceUsage": {
-       "bandwidthIn": {
-         "limit": 10240000.0,
-         "usage": 0.0
-       },
-       "bandwidthOut": {
-         "limit": 10240000.0,
-         "usage": 0.0
-       },
-       "cpu": {
-         "limit": 2400.0,
-         "usage": 0.0
-       },
-       "directMemory": {
-         "limit": 16384.0,
-         "usage": 1.0
-       },
-       "memory": {
-         "limit": 8192.0,
-         "usage": 3903.0
-       }
-     }
-   }
-   
-   ```
-
-3. The command-line [broker monitor](reference-cli-tools.md#monitor-brokers) will have a different output format depending on which load manager implementation is being used.
-
-   Here is an example from the modular load manager:
-
-   ```
-   
-   ===================================================================================================================
-   ||SYSTEM         |CPU %          |MEMORY %       |DIRECT %       |BW IN %        |BW OUT %       |MAX %          ||
-   ||               |0.00           |48.33          |0.01           |0.00           |0.00           |48.33          ||
-   ||COUNT          |TOPIC          |BUNDLE         |PRODUCER       |CONSUMER       |BUNDLE +       |BUNDLE -       ||
-   ||               |4              |4              |0              |2              |4              |0              ||
-   ||LATEST         |MSG/S IN       |MSG/S OUT      |TOTAL          |KB/S IN        |KB/S OUT       |TOTAL          ||
-   ||               |0.00           |0.00           |0.00           |0.00           |0.00           |0.00           ||
-   ||SHORT          |MSG/S IN       |MSG/S OUT      |TOTAL          |KB/S IN        |KB/S OUT       |TOTAL          ||
-   ||               |0.00           |0.00           |0.00           |0.00           |0.00           |0.00           ||
-   ||LONG           |MSG/S IN       |MSG/S OUT      |TOTAL          |KB/S IN        |KB/S OUT       |TOTAL          ||
-   ||               |0.00           |0.00           |0.00           |0.00           |0.00           |0.00           ||
-   ===================================================================================================================
-   
-   ```
-
-   Here is an example from the simple load manager:
-
-   ```
-   
-   ===================================================================================================================
-   ||COUNT          |TOPIC          |BUNDLE         |PRODUCER       |CONSUMER       |BUNDLE +       |BUNDLE -       ||
-   ||               |4              |4              |0              |2              |0              |0              ||
-   ||RAW SYSTEM     |CPU %          |MEMORY %       |DIRECT %       |BW IN %        |BW OUT %       |MAX %          ||
-   ||               |0.25           |47.94          |0.01           |0.00           |0.00           |47.94          ||
-   ||ALLOC SYSTEM   |CPU %          |MEMORY %       |DIRECT %       |BW IN %        |BW OUT %       |MAX %          ||
-   ||               |0.20           |1.89           |               |1.27           |3.21           |3.21           ||
-   ||RAW MSG        |MSG/S IN       |MSG/S OUT      |TOTAL          |KB/S IN        |KB/S OUT       |TOTAL          ||
-   ||               |0.00           |0.00           |0.00           |0.01           |0.01           |0.01           ||
-   ||ALLOC MSG      |MSG/S IN       |MSG/S OUT      |TOTAL          |KB/S IN        |KB/S OUT       |TOTAL          ||
-   ||               |54.84          |134.48         |189.31         |126.54         |320.96         |447.50         ||
-   ===================================================================================================================
-   
-   ```
-
-It is important to note that the module load manager is _centralized_, meaning that all requests to assign a bundle---whether it's been seen before or whether this is the first time---only get handled by the _lead_ broker (which can change over time). To determine the current lead broker, examine the `/loadbalance/leader` node in ZooKeeper.
-
-## Implementation
-
-### Data
-
-The data monitored by the modular load manager is contained in the [`LoadData`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/LoadData.java) class.
-Here, the available data is subdivided into the bundle data and the broker data.
-
-#### Broker
-
-The broker data is contained in the [`BrokerData`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/BrokerData.java) class. It is further subdivided into two parts,
-one being the local data which every broker individually writes to ZooKeeper, and the other being the historical broker
-data which is written to ZooKeeper by the leader broker.
-
-##### Local Broker Data
-The local broker data is contained in the class [`LocalBrokerData`](https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java) and provides information about the following resources:
-
-* CPU usage
-* JVM heap memory usage
-* Direct memory usage
-* Bandwidth in/out usage
-* Most recent total message rate in/out across all bundles
-* Total number of topics, bundles, producers, and consumers
-* Names of all bundles assigned to this broker
-* Most recent changes in bundle assignments for this broker
-
-The local broker data is updated periodically according to the service configuration
-"loadBalancerReportUpdateMaxIntervalMinutes". After any broker updates their local broker data, the leader broker will
-receive the update immediately via a ZooKeeper watch, where the local data is read from the ZooKeeper node
-`/loadbalance/brokers/<broker host/port>`
-
-##### Historical Broker Data
-
-The historical broker data is contained in the [`TimeAverageBrokerData`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/TimeAverageBrokerData.java) class.
-
-In order to reconcile the need to make good decisions in a steady-state scenario and make reactive decisions in a critical scenario, the historical data is split into two parts: the short-term data for reactive decisions, and the long-term data for steady-state decisions. Both time frames maintain the following information:
-
-* Message rate in/out for the entire broker
-* Message throughput in/out for the entire broker
-
-Unlike the bundle data, the broker data does not maintain samples for the global broker message rates and throughputs, which is not expected to remain steady as new bundles are removed or added. Instead, this data is aggregated over the short-term and long-term data for the bundles. See the section on bundle data to understand how that data is collected and maintained.
-
-The historical broker data is updated for each broker in memory by the leader broker whenever any broker writes their local data to ZooKeeper. Then, the historical data is written to ZooKeeper by the leader broker periodically according to the configuration `loadBalancerResourceQuotaUpdateIntervalMinutes`.
-
-##### Bundle Data
-
-The bundle data is contained in the [`BundleData`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/BundleData.java). Like the historical broker data, the bundle data is split into a short-term and a long-term time frame. The information maintained in each time frame:
-
-* Message rate in/out for this bundle
-* Message Throughput In/Out for this bundle
-* Current number of samples for this bundle
-
-The time frames are implemented by maintaining the average of these values over a set, limited number of samples, where
-the samples are obtained through the message rate and throughput values in the local data. Thus, if the update interval
-for the local data is 2 minutes, the number of short samples is 10 and the number of long samples is 1000, the
-short-term data is maintained over a period of `10 samples * 2 minutes / sample = 20 minutes`, while the long-term
-data is similarly over a period of 2000 minutes. Whenever there are not enough samples to satisfy a given time frame,
-the average is taken only over the existing samples. When no samples are available, default values are assumed until
-they are overwritten by the first sample. Currently, the default values are
-
-* Message rate in/out: 50 messages per second both ways
-* Message throughput in/out: 50KB per second both ways
-
-The bundle data is updated in memory on the leader broker whenever any broker writes their local data to ZooKeeper.
-Then, the bundle data is written to ZooKeeper by the leader broker periodically at the same time as the historical
-broker data, according to the configuration `loadBalancerResourceQuotaUpdateIntervalMinutes`.
-
-### Traffic Distribution
-
-The modular load manager uses the abstraction provided by [`ModularLoadManagerStrategy`](https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategy.java) to make decisions about bundle assignment. The strategy makes a decision by considering the service configuration, the entire load data, and the bundle data for the bundle to be assigned. Currently, the only supported strategy is [`LeastLongTermMessageRate`](h [...]
-
-#### Least Long Term Message Rate Strategy
-
-As its name suggests, the least long term message rate strategy attempts to distribute bundles across brokers so that
-the message rate in the long-term time window for each broker is roughly the same. However, simply balancing load based
-on message rate does not handle the issue of asymmetric resource burden per message on each broker. Thus, the system
-resource usages, which are CPU, memory, direct memory, bandwidth in, and bandwidth out, are also considered in the
-assignment process. This is done by weighting the final message rate according to
-`1 / (overload_threshold - max_usage)`, where `overload_threshold` corresponds to the configuration
-`loadBalancerBrokerOverloadedThresholdPercentage` and `max_usage` is the maximum proportion among the system resources
-that is being utilized by the candidate broker. This multiplier ensures that machines with are being more heavily taxed
-by the same message rates will receive less load. In particular, it tries to ensure that if one machine is overloaded,
-then all machines are approximately overloaded. In the case in which a broker's max usage exceeds the overload
-threshold, that broker is not considered for bundle assignment. If all brokers are overloaded, the bundle is randomly
-assigned.
-
diff --git a/site2/website-next/versioned_docs/version-2.7.2/developing-tools.md b/site2/website-next/versioned_docs/version-2.7.2/developing-tools.md
deleted file mode 100644
index b545779..0000000
--- a/site2/website-next/versioned_docs/version-2.7.2/developing-tools.md
+++ /dev/null
@@ -1,111 +0,0 @@
----
-id: develop-tools
-title: Simulation tools
-sidebar_label: "Simulation tools"
-original_id: develop-tools
----
-
-It is sometimes necessary create an test environment and incur artificial load to observe how well load managers
-handle the load. The load simulation controller, the load simulation client, and the broker monitor were created as an
-effort to make create this load and observe the effects on the managers more easily.
-
-## Simulation Client
-The simulation client is a machine which will create and subscribe to topics with configurable message rates and sizes.
-Because it is sometimes necessary in simulating large load to use multiple client machines, the user does not interact
-with the simulation client directly, but instead delegates their requests to the simulation controller, which will then
-send signals to clients to start incurring load. The client implementation is in the class
-`org.apache.pulsar.testclient.LoadSimulationClient`.
-
-### Usage
-To Start a simulation client, use the `pulsar-perf` script with the command `simulation-client` as follows:
-
-```
-
-pulsar-perf simulation-client --port <listen port> --service-url <pulsar service url>
-
-```
-
-The client will then be ready to receive controller commands.
-## Simulation Controller
-The simulation controller send signals to the simulation clients, requesting them to create new topics, stop old
-topics, change the load incurred by topics, as well as several other tasks. It is implemented in the class
-`org.apache.pulsar.testclient.LoadSimulationController` and presents a shell to the user as an interface to send
-command with.
-
-### Usage
-To start a simulation controller, use the `pulsar-perf` script with the command `simulation-controller` as follows:
-
-```
-
-pulsar-perf simulation-controller --cluster <cluster to simulate on> --client-port <listen port for clients>
---clients <comma-separated list of client host names>
-
-```
-
-The clients should already be started before the controller is started. You will then be presented with a simple prompt,
-where you can issue commands to simulation clients. Arguments often refer to tenant names, namespace names, and topic
-names. In all cases, the BASE name of the tenants, namespaces, and topics are used. For example, for the topic
-`persistent://my_tenant/my_cluster/my_namespace/my_topic`, the tenant name is `my_tenant`, the namespace name is
-`my_namespace`, and the topic name is `my_topic`. The controller can perform the following actions:
-
-* Create a topic with a producer and a consumer
-  * `trade <tenant> <namespace> <topic> [--rate <message rate per second>]
-  [--rand-rate <lower bound>,<upper bound>]
-  [--size <message size in bytes>]`
-* Create a group of topics with a producer and a consumer
-  * `trade_group <tenant> <group> <num_namespaces> [--rate <message rate per second>]
-  [--rand-rate <lower bound>,<upper bound>]
-  [--separation <separation between creating topics in ms>] [--size <message size in bytes>]
-  [--topics-per-namespace <number of topics to create per namespace>]`
-* Change the configuration of an existing topic
-  * `change <tenant> <namespace> <topic> [--rate <message rate per second>]
-  [--rand-rate <lower bound>,<upper bound>]
-  [--size <message size in bytes>]`
-* Change the configuration of a group of topics
-  * `change_group <tenant> <group> [--rate <message rate per second>] [--rand-rate <lower bound>,<upper bound>]
-  [--size <message size in bytes>] [--topics-per-namespace <number of topics to create per namespace>]`
-* Shutdown a previously created topic
-  * `stop <tenant> <namespace> <topic>`
-* Shutdown a previously created group of topics
-  * `stop_group <tenant> <group>`
-* Copy the historical data from one ZooKeeper to another and simulate based on the message rates and sizes in that history
-  * `copy <tenant> <source zookeeper> <target zookeeper> [--rate-multiplier value]`
-* Simulate the load of the historical data on the current ZooKeeper (should be same ZooKeeper being simulated on)
-  * `simulate <tenant> <zookeeper> [--rate-multiplier value]`
-* Stream the latest data from the given active ZooKeeper to simulate the real-time load of that ZooKeeper.
-  * `stream <tenant> <zookeeper> [--rate-multiplier value]`
-
-The "group" arguments in these commands allow the user to create or affect multiple topics at once. Groups are created
-when calling the `trade_group` command, and all topics from these groups may be subsequently modified or stopped
-with the `change_group` and `stop_group` commands respectively. All ZooKeeper arguments are of the form
-`zookeeper_host:port`.
-
-### Difference Between Copy, Simulate, and Stream
-The commands `copy`, `simulate`, and `stream` are very similar but have significant differences. `copy` is used when
-you want to simulate the load of a static, external ZooKeeper on the ZooKeeper you are simulating on. Thus,
-`source zookeeper` should be the ZooKeeper you want to copy and `target zookeeper` should be the ZooKeeper you are
-simulating on, and then it will get the full benefit of the historical data of the source in both load manager
-implementations. `simulate` on the other hand takes in only one ZooKeeper, the one you are simulating on. It assumes
-that you are simulating on a ZooKeeper that has historical data for `SimpleLoadManagerImpl` and creates equivalent
-historical data for `ModularLoadManagerImpl`. Then, the load according to the historical data is simulated by the
-clients. Finally, `stream` takes in an active ZooKeeper different than the ZooKeeper being simulated on and streams
-load data from it and simulates the real-time load. In all cases, the optional `rate-multiplier` argument allows the
-user to simulate some proportion of the load. For instance, using `--rate-multiplier 0.05` will cause messages to
-be sent at only `5%` of the rate of the load that is being simulated.
-
-## Broker Monitor
-To observe the behavior of the load manager in these simulations, one may utilize the broker monitor, which is
-implemented in `org.apache.pulsar.testclient.BrokerMonitor`. The broker monitor will print tabular load data to the
-console as it is updated using watchers.
-
-### Usage
-To start a broker monitor, use the `monitor-brokers` command in the `pulsar-perf` script:
-
-```
-
-pulsar-perf monitor-brokers --connect-string <zookeeper host:port>
-
-```
-
-The console will then continuously print load data until it is interrupted.
-
diff --git a/site2/website-next/versioned_sidebars/version-2.4.1-sidebars.json b/site2/website-next/versioned_sidebars/version-2.4.1-sidebars.json
new file mode 100644
index 0000000..7b68978
--- /dev/null
+++ b/site2/website-next/versioned_sidebars/version-2.4.1-sidebars.json
@@ -0,0 +1,94 @@
+{
+  "version-2.4.1/docsSidebar": [
+    {
+      "type": "category",
+      "label": "Get Started",
+      "items": [
+        {
+          "type": "doc",
+          "id": "version-2.4.1/pulsar-2.0"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.4.1/standalone"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.4.1/standalone-docker"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.4.1/client-libraries"
+        }
+      ]
+    },
+    {
+      "type": "category",
+      "label": "Concepts and Architecture",
+      "items": [
+        {
+          "type": "doc",
+          "id": "version-2.4.1/concepts-overview"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.4.1/concepts-messaging"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.4.1/concepts-architecture-overview"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.4.1/concepts-clients"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.4.1/concepts-replication"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.4.1/concepts-multi-tenancy"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.4.1/concepts-authentication"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.4.1/concepts-topic-compaction"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.4.1/concepts-tiered-storage"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.4.1/concepts-schema-registry"
+        }
+      ]
+    },
+    {
+      "type": "category",
+      "label": "Pulsar Schema",
+      "items": [
+        {
+          "type": "doc",
+          "id": "version-2.4.1/schema-get-started"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.4.1/schema-understand"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.4.1/schema-evolution-compatibility"
+        },
+        {
+          "type": "doc",
+          "id": "version-2.4.1/schema-manage"
+        }
+      ]
+    }
+  ]
+}
\ No newline at end of file
diff --git a/site2/website-next/versions.json b/site2/website-next/versions.json
index 07316f3..3b1fa35 100644
--- a/site2/website-next/versions.json
+++ b/site2/website-next/versions.json
@@ -13,5 +13,6 @@
   "2.5.1",
   "2.5.0",
   "2.4.2",
+  "2.4.1",
   "2.2.0"
 ]