You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by hu...@apache.org on 2022/05/10 09:36:45 UTC

[pulsar] branch master updated: [Improve][Doc]Update Kafka source connector docs (#15495)

This is an automated email from the ASF dual-hosted git repository.

huanlimeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 89958018da5 [Improve][Doc]Update Kafka source connector docs (#15495)
89958018da5 is described below

commit 89958018da5311889215912ce72c5979675186e6
Author: Huanli Meng <48...@users.noreply.github.com>
AuthorDate: Tue May 10 17:36:39 2022 +0800

    [Improve][Doc]Update Kafka source connector docs (#15495)
    
    * Update kafka source connector docs
    
    * Apply tech review comments
---
 site2/docs/io-kafka-source.md                      | 152 +++++++++++----------
 .../version-2.10.0/io-kafka-source.md              | 152 +++++++++++----------
 .../version-2.9.2/io-kafka-source.md               | 150 ++++++++++----------
 3 files changed, 243 insertions(+), 211 deletions(-)

diff --git a/site2/docs/io-kafka-source.md b/site2/docs/io-kafka-source.md
index 1bce28381d6..3b34b18127e 100644
--- a/site2/docs/io-kafka-source.md
+++ b/site2/docs/io-kafka-source.md
@@ -17,18 +17,18 @@ The configuration of the Kafka source connector has the following properties.
 
 | Name | Type| Required | Default | Description 
 |------|----------|---------|-------------|-------------|
-| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
+|  `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
 | `groupId` |String| true | " " (empty string) | A unique string that identifies the group of consumer processes to which this consumer belongs. |
 | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch response. |
 | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's offset is periodically committed in the background.<br/><br/> This committed offset is used when the process fails as the position from which a new consumer begins. |
 | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is set to true. |
 | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats to the consumer when using Kafka's group management facilities. <br/><br/>**Note: `heartbeatIntervalMs` must be smaller than `sessionTimeoutMs`**.|
 | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect consumer failures when using Kafka's group management facility. |
-| `topic` | String|true | " " (empty string)| The Kafka topic which sends messages to Pulsar. |
-| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. |
+| `topic` | String|true | " " (empty string)| The Kafka topic that sends messages to Pulsar. |
+|  `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. |
 | `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringDeserializer | The deserializer class for Kafka consumers to deserialize keys.<br/> The deserializer is set by a specific implementation of [`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
 | `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer class for Kafka consumers to deserialize values.
-| `autoOffsetReset` | String | false | "earliest" | The default offset reset policy. |
+| `autoOffsetReset` | String | false | earliest | The default offset reset policy. |
 
 ### Schema Management
 
@@ -64,83 +64,65 @@ If you want to access the raw key, you can use the `Message#getKeyBytes()` API.
 
 Before using the Kafka source connector, you need to create a configuration file through one of the following methods.
 
-* JSON 
+- JSON
 
-    ```json
+   ```json
     {
-       "configs": {
-          "bootstrapServers": "pulsar-kafka:9092",
-          "groupId": "test-pulsar-io",
-          "topic": "my-topic",
-          "sessionTimeoutMs": "10000",
-          "autoCommitEnabled": false
-       }
+      "bootstrapServers": "pulsar-kafka:9092",
+      "groupId": "test-pulsar-io",
+      "topic": "my-topic",
+      "sessionTimeoutMs": "10000",
+      "autoCommitEnabled": false
     }
-    ```
+   ```
 
-* YAML
+- YAML
 
-    ```yaml
+   ```yaml
     configs:
-        bootstrapServers: "pulsar-kafka:9092"
-        groupId: "test-pulsar-io"
-        topic: "my-topic"
-        sessionTimeoutMs: "10000"
+      bootstrapServers: "pulsar-kafka:9092"
+      groupId: "test-pulsar-io"
+      topic: "my-topic"
+      sessionTimeoutMs: "10000"
         autoCommitEnabled: false
     ```
 
 ## Usage
 
-Here is an example of using the Kafka source connector with the configuration file as shown previously.
+You can make the Kafka source connector as a Pulsar built-in connector and use it on a standalone cluster or an on-premises cluster.
 
-1. Download a Kafka client and a Kafka connector.
+### Standalone cluster
 
-    ```bash
-    $ wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar
+This example describes how to use the Kafka source connector to feed data from Kafka and write data to Pulsar topics in the standalone mode.
 
-    $ wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/connectors/pulsar-io-kafka-2.4.0.nar
-    ```
+#### Prerequisites
 
-2. Create a network.
-   
-   ```bash
-   $ docker network create kafka-pulsar
-   ```
+- Install [Docker](https://docs.docker.com/get-docker/)(Community Edition). 
 
-3. Pull a ZooKeeper image and start ZooKeeper.
-   
-   ```bash
-   $ docker pull wurstmeister/zookeeper
+#### Steps
 
-   $ docker run -d -it -p 2181:2181 --name pulsar-kafka-zookeeper --network kafka-pulsar wurstmeister/zookeeper
-   ```
+1. Download and start the Confluent Platform.
 
-4. Pull a Kafka image and start Kafka.
-   
-   ```bash
-   $ docker pull wurstmeister/kafka:2.11-1.0.2
-   
-   $ docker run -d -it --network kafka-pulsar -p 6667:6667 -p 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=pulsar-kafka -e KAFKA_ZOOKEEPER_CONNECT=pulsar-kafka-zookeeper:2181 --name pulsar-kafka wurstmeister/kafka:2.11-1.0.2
-   ```
+For details, see the [documentation](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) to install the Kafka service locally.
 
-5. Pull a Pulsar image and start Pulsar standalone.
-   
-   ```bash
-   $ docker pull apachepulsar/pulsar:{{pulsar:version}}
-   
-   $ docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone
-   ```
+2. Pull a Pulsar image and start Pulsar in standalone mode.
+
+    ```bash
+    docker pull apachepulsar/pulsar:latest
+      
+    docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:latest bin/pulsar standalone
+    ```
+
+3. Create a producer file _kafka-producer.py_.
 
-6. Create a producer file _kafka-producer.py_.
-   
    ```python
    from kafka import KafkaProducer
-   producer = KafkaProducer(bootstrap_servers='pulsar-kafka:9092')
+   producer = KafkaProducer(bootstrap_servers='localhost:9092')
    future = producer.send('my-topic', b'hello world')
    future.get()
    ```
 
-7. Create a consumer file _pulsar-client.py_.
+4. Create a consumer file _pulsar-client.py_.
 
     ```python
     import pulsar
@@ -158,23 +140,20 @@ Here is an example of using the Kafka source connector with the configuration fi
     client.close()
     ```
 
-8. Copy the following files to Pulsar.
-   
+5. Copy the following files to Pulsar.
+  
     ```bash
-    $ docker cp pulsar-io-kafka-{{pulsar:version}}.nar pulsar-kafka-standalone:/pulsar
-    $ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
-    $ docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/
-    $ docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/
+    docker cp pulsar-io-kafka.nar pulsar-kafka-standalone:/pulsar
+    docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
     ```
 
-9. Open a new terminal window and start the Kafka source connector in local run mode. 
+6. Open a new terminal window and start the Kafka source connector in local run mode.
 
     ```bash
-    $ docker exec -it pulsar-kafka-standalone /bin/bash
+    docker exec -it pulsar-kafka-standalone /bin/bash
 
-    $ ./bin/pulsar-admin source localrun \
-    --archive ./pulsar-io-kafka-{{pulsar:version}}.nar \
-    --classname org.apache.pulsar.io.kafka.KafkaBytesSource \
+    ./bin/pulsar-admin source localrun \
+    --archive ./pulsar-io-kafka.nar \
     --tenant public \
     --namespace default \
     --name kafka \
@@ -183,18 +162,49 @@ Here is an example of using the Kafka source connector with the configuration fi
     --parallelism 1
     ```
 
-10. Open a new terminal window and run the consumer.
+7. Open a new terminal window and run the Kafka producer locally.
 
     ```bash
-    $ docker exec -it pulsar-kafka-standalone /bin/bash
+    python3 kafka-producer.py
+    ```
 
-    $ pip install kafka-python
+8. Open a new terminal window and run the Pulsar consumer locally.
 
-    $ python3 kafka-producer.py
+    ```bash
+    python3 pulsar-client.py
     ```
 
-    The following information appears on the consumer terminal window.
+The following information appears on the consumer terminal window.
 
     ```bash
     Received message: 'hello world'
     ```
+
+### On-premises cluster
+
+This example explains how to create a Kafka source connector in an on-premises cluster.
+
+1. Copy the NAR package of the Kafka connector to the Pulsar connectors directory.
+
+    ```
+    cp pulsar-io-kafka-{{connector:version}}.nar $PULSAR_HOME/connectors/pulsar-io-kafka-{{connector:version}}.nar
+    ```
+
+2. Reload all [built-in connectors](https://pulsar.apache.org/docs/en/next/io-connectors/).
+
+    ```
+    PULSAR_HOME/bin/pulsar-admin sources reload
+    ```
+
+3. Check whether the Kafka source connector is available on the list or not.
+
+    ```
+    PULSAR_HOME/bin/pulsar-admin sources available-sources
+    ```
+
+4. Create a Kafka source connector on a Pulsar cluster using the [`pulsar-admin sources create`](http://pulsar.apache.org/tools/pulsar-admin/2.11.0-SNAPSHOT/#-em-create-em--14) command.
+
+    ```
+    PULSAR_HOME/bin/pulsar-admin sources create \
+    --source-config-file <kafka-source-config.yaml>
+    ```
\ No newline at end of file
diff --git a/site2/website/versioned_docs/version-2.10.0/io-kafka-source.md b/site2/website/versioned_docs/version-2.10.0/io-kafka-source.md
index 21611696648..d0d779786f0 100644
--- a/site2/website/versioned_docs/version-2.10.0/io-kafka-source.md
+++ b/site2/website/versioned_docs/version-2.10.0/io-kafka-source.md
@@ -18,18 +18,18 @@ The configuration of the Kafka source connector has the following properties.
 
 | Name | Type| Required | Default | Description 
 |------|----------|---------|-------------|-------------|
-| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
+|  `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
 | `groupId` |String| true | " " (empty string) | A unique string that identifies the group of consumer processes to which this consumer belongs. |
 | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch response. |
 | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's offset is periodically committed in the background.<br/><br/> This committed offset is used when the process fails as the position from which a new consumer begins. |
 | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is set to true. |
 | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats to the consumer when using Kafka's group management facilities. <br/><br/>**Note: `heartbeatIntervalMs` must be smaller than `sessionTimeoutMs`**.|
 | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect consumer failures when using Kafka's group management facility. |
-| `topic` | String|true | " " (empty string)| The Kafka topic which sends messages to Pulsar. |
-| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. |
+| `topic` | String|true | " " (empty string)| The Kafka topic that sends messages to Pulsar. |
+|  `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. |
 | `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringDeserializer | The deserializer class for Kafka consumers to deserialize keys.<br/> The deserializer is set by a specific implementation of [`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
 | `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer class for Kafka consumers to deserialize values.
-| `autoOffsetReset` | String | false | "earliest" | The default offset reset policy. |
+| `autoOffsetReset` | String | false | earliest | The default offset reset policy. |
 
 ### Schema Management
 
@@ -65,83 +65,65 @@ If you want to access the raw key, you can use the `Message#getKeyBytes()` API.
 
 Before using the Kafka source connector, you need to create a configuration file through one of the following methods.
 
-* JSON 
+- JSON
 
-    ```json
+   ```json
     {
-       "configs": {
-          "bootstrapServers": "pulsar-kafka:9092",
-          "groupId": "test-pulsar-io",
-          "topic": "my-topic",
-          "sessionTimeoutMs": "10000",
-          "autoCommitEnabled": false
-       }
+      "bootstrapServers": "pulsar-kafka:9092",
+      "groupId": "test-pulsar-io",
+      "topic": "my-topic",
+      "sessionTimeoutMs": "10000",
+      "autoCommitEnabled": false
     }
-    ```
+   ```
 
-* YAML
+- YAML
 
-    ```yaml
+   ```yaml
     configs:
-        bootstrapServers: "pulsar-kafka:9092"
-        groupId: "test-pulsar-io"
-        topic: "my-topic"
-        sessionTimeoutMs: "10000"
+      bootstrapServers: "pulsar-kafka:9092"
+      groupId: "test-pulsar-io"
+      topic: "my-topic"
+      sessionTimeoutMs: "10000"
         autoCommitEnabled: false
     ```
 
 ## Usage
 
-Here is an example of using the Kafka source connector with the configuration file as shown previously.
+You can make the Kafka source connector as a Pulsar built-in connector and use it on a standalone cluster or an on-premises cluster.
 
-1. Download a Kafka client and a Kafka connector.
+### Standalone cluster
 
-    ```bash
-    $ wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar
+This example describes how to use the Kafka source connector to feed data from Kafka and write data to Pulsar topics in the standalone mode.
 
-    $ wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/connectors/pulsar-io-kafka-2.4.0.nar
-    ```
+#### Prerequisites
 
-2. Create a network.
-   
-   ```bash
-   $ docker network create kafka-pulsar
-   ```
+- Install [Docker](https://docs.docker.com/get-docker/)(Community Edition). 
 
-3. Pull a ZooKeeper image and start ZooKeeper.
-   
-   ```bash
-   $ docker pull wurstmeister/zookeeper
+#### Steps
 
-   $ docker run -d -it -p 2181:2181 --name pulsar-kafka-zookeeper --network kafka-pulsar wurstmeister/zookeeper
-   ```
+1. Download and start the Confluent Platform.
 
-4. Pull a Kafka image and start Kafka.
-   
-   ```bash
-   $ docker pull wurstmeister/kafka:2.11-1.0.2
-   
-   $ docker run -d -it --network kafka-pulsar -p 6667:6667 -p 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=pulsar-kafka -e KAFKA_ZOOKEEPER_CONNECT=pulsar-kafka-zookeeper:2181 --name pulsar-kafka wurstmeister/kafka:2.11-1.0.2
-   ```
+For details, see the [documentation](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) to install the Kafka service locally.
 
-5. Pull a Pulsar image and start Pulsar standalone.
-   
-   ```bash
-   $ docker pull apachepulsar/pulsar:{{pulsar:version}}
-   
-   $ docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone
-   ```
+2. Pull a Pulsar image and start Pulsar in standalone mode.
+
+    ```bash
+    docker pull apachepulsar/pulsar:latest
+      
+    docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:latest bin/pulsar standalone
+    ```
+
+3. Create a producer file _kafka-producer.py_.
 
-6. Create a producer file _kafka-producer.py_.
-   
    ```python
    from kafka import KafkaProducer
-   producer = KafkaProducer(bootstrap_servers='pulsar-kafka:9092')
+   producer = KafkaProducer(bootstrap_servers='localhost:9092')
    future = producer.send('my-topic', b'hello world')
    future.get()
    ```
 
-7. Create a consumer file _pulsar-client.py_.
+4. Create a consumer file _pulsar-client.py_.
 
     ```python
     import pulsar
@@ -159,23 +141,20 @@ Here is an example of using the Kafka source connector with the configuration fi
     client.close()
     ```
 
-8. Copy the following files to Pulsar.
-   
+5. Copy the following files to Pulsar.
+  
     ```bash
-    $ docker cp pulsar-io-kafka-{{pulsar:version}}.nar pulsar-kafka-standalone:/pulsar
-    $ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
-    $ docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/
-    $ docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/
+    docker cp pulsar-io-kafka.nar pulsar-kafka-standalone:/pulsar
+    docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
     ```
 
-9. Open a new terminal window and start the Kafka source connector in local run mode. 
+6. Open a new terminal window and start the Kafka source connector in local run mode.
 
     ```bash
-    $ docker exec -it pulsar-kafka-standalone /bin/bash
+    docker exec -it pulsar-kafka-standalone /bin/bash
 
-    $ ./bin/pulsar-admin source localrun \
-    --archive ./pulsar-io-kafka-{{pulsar:version}}.nar \
-    --classname org.apache.pulsar.io.kafka.KafkaBytesSource \
+    ./bin/pulsar-admin source localrun \
+    --archive ./pulsar-io-kafka.nar \
     --tenant public \
     --namespace default \
     --name kafka \
@@ -184,18 +163,49 @@ Here is an example of using the Kafka source connector with the configuration fi
     --parallelism 1
     ```
 
-10. Open a new terminal window and run the consumer.
+7. Open a new terminal window and run the Kafka producer locally.
 
     ```bash
-    $ docker exec -it pulsar-kafka-standalone /bin/bash
+    python3 kafka-producer.py
+    ```
 
-    $ pip install kafka-python
+8. Open a new terminal window and run the Pulsar consumer locally.
 
-    $ python3 kafka-producer.py
+    ```bash
+    python3 pulsar-client.py
     ```
 
-    The following information appears on the consumer terminal window.
+The following information appears on the consumer terminal window.
 
     ```bash
     Received message: 'hello world'
     ```
+
+### On-premises cluster
+
+This example explains how to create a Kafka source connector in an on-premises cluster.
+
+1. Copy the NAR package of the Kafka connector to the Pulsar connectors directory.
+
+    ```
+    cp pulsar-io-kafka-{{connector:version}}.nar $PULSAR_HOME/connectors/pulsar-io-kafka-{{connector:version}}.nar
+    ```
+
+2. Reload all [built-in connectors](https://pulsar.apache.org/docs/en/next/io-connectors/).
+
+    ```
+    PULSAR_HOME/bin/pulsar-admin sources reload
+    ```
+
+3. Check whether the Kafka source connector is available on the list or not.
+
+    ```
+    PULSAR_HOME/bin/pulsar-admin sources available-sources
+    ```
+
+4. Create a Kafka source connector on a Pulsar cluster using the [`pulsar-admin sources create`](http://pulsar.apache.org/tools/pulsar-admin/2.10.0-SNAPSHOT/#-em-create-em--14) command.
+
+    ```
+    PULSAR_HOME/bin/pulsar-admin sources create \
+    --source-config-file <kafka-source-config.yaml>
+    ```
\ No newline at end of file
diff --git a/site2/website/versioned_docs/version-2.9.2/io-kafka-source.md b/site2/website/versioned_docs/version-2.9.2/io-kafka-source.md
index 71122fdf930..926576d5aa7 100644
--- a/site2/website/versioned_docs/version-2.9.2/io-kafka-source.md
+++ b/site2/website/versioned_docs/version-2.9.2/io-kafka-source.md
@@ -18,18 +18,18 @@ The configuration of the Kafka source connector has the following properties.
 
 | Name | Type| Required | Default | Description 
 |------|----------|---------|-------------|-------------|
-| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
+|  `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
 | `groupId` |String| true | " " (empty string) | A unique string that identifies the group of consumer processes to which this consumer belongs. |
 | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch response. |
 | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's offset is periodically committed in the background.<br/><br/> This committed offset is used when the process fails as the position from which a new consumer begins. |
 | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is set to true. |
 | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats to the consumer when using Kafka's group management facilities. <br/><br/>**Note: `heartbeatIntervalMs` must be smaller than `sessionTimeoutMs`**.|
 | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect consumer failures when using Kafka's group management facility. |
-| `topic` | String|true | " " (empty string)| The Kafka topic which sends messages to Pulsar. |
-| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. |
+| `topic` | String|true | " " (empty string)| The Kafka topic that sends messages to Pulsar. |
+|  `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. |
 | `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringDeserializer | The deserializer class for Kafka consumers to deserialize keys.<br/> The deserializer is set by a specific implementation of [`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
 | `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer class for Kafka consumers to deserialize values.
-| `autoOffsetReset` | String | false | "earliest" | The default offset reset policy. |
+| `autoOffsetReset` | String | false | earliest | The default offset reset policy. |
 
 ### Schema Management
 
@@ -65,81 +65,65 @@ If you want to access the raw key, you can use the `Message#getKeyBytes()` API.
 
 Before using the Kafka source connector, you need to create a configuration file through one of the following methods.
 
-* JSON 
+- JSON
 
-    ```json
+   ```json
     {
-        "bootstrapServers": "pulsar-kafka:9092",
-        "groupId": "test-pulsar-io",
-        "topic": "my-topic",
-        "sessionTimeoutMs": "10000",
-        "autoCommitEnabled": false
+      "bootstrapServers": "pulsar-kafka:9092",
+      "groupId": "test-pulsar-io",
+      "topic": "my-topic",
+      "sessionTimeoutMs": "10000",
+      "autoCommitEnabled": false
     }
-    ```
+   ```
 
-* YAML
+- YAML
 
-    ```yaml
+   ```yaml
     configs:
-        bootstrapServers: "pulsar-kafka:9092"
-        groupId: "test-pulsar-io"
-        topic: "my-topic"
-        sessionTimeoutMs: "10000"
+      bootstrapServers: "pulsar-kafka:9092"
+      groupId: "test-pulsar-io"
+      topic: "my-topic"
+      sessionTimeoutMs: "10000"
         autoCommitEnabled: false
     ```
 
 ## Usage
 
-Here is an example of using the Kafka source connector with the configuration file as shown previously.
+You can make the Kafka source connector as a Pulsar built-in connector and use it on a standalone cluster or an on-premises cluster.
 
-1. Download a Kafka client and a Kafka connector.
+### Standalone cluster
 
-    ```bash
-    $ wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar
+This example describes how to use the Kafka source connector to feed data from Kafka and write data to Pulsar topics in the standalone mode.
 
-    $ wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/connectors/pulsar-io-kafka-2.4.0.nar
-    ```
+#### Prerequisites
 
-2. Create a network.
-   
-   ```bash
-   $ docker network create kafka-pulsar
-   ```
+- Install [Docker](https://docs.docker.com/get-docker/)(Community Edition). 
 
-3. Pull a ZooKeeper image and start ZooKeeper.
-   
-   ```bash
-   $ docker pull wurstmeister/zookeeper
+#### Steps
 
-   $ docker run -d -it -p 2181:2181 --name pulsar-kafka-zookeeper --network kafka-pulsar wurstmeister/zookeeper
-   ```
+1. Download and start the Confluent Platform.
 
-4. Pull a Kafka image and start Kafka.
-   
-   ```bash
-   $ docker pull wurstmeister/kafka:2.11-1.0.2
-   
-   $ docker run -d -it --network kafka-pulsar -p 6667:6667 -p 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=pulsar-kafka -e KAFKA_ZOOKEEPER_CONNECT=pulsar-kafka-zookeeper:2181 --name pulsar-kafka wurstmeister/kafka:2.11-1.0.2
-   ```
+For details, see the [documentation](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) to install the Kafka service locally.
 
-5. Pull a Pulsar image and start Pulsar standalone.
-   
-   ```bash
-   $ docker pull apachepulsar/pulsar:{{pulsar:version}}
-   
-   $ docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone
-   ```
+2. Pull a Pulsar image and start Pulsar in standalone mode.
+
+    ```bash
+    docker pull apachepulsar/pulsar:latest
+      
+    docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:latest bin/pulsar standalone
+    ```
+
+3. Create a producer file _kafka-producer.py_.
 
-6. Create a producer file _kafka-producer.py_.
-   
    ```python
    from kafka import KafkaProducer
-   producer = KafkaProducer(bootstrap_servers='pulsar-kafka:9092')
+   producer = KafkaProducer(bootstrap_servers='localhost:9092')
    future = producer.send('my-topic', b'hello world')
    future.get()
    ```
 
-7. Create a consumer file _pulsar-client.py_.
+4. Create a consumer file _pulsar-client.py_.
 
     ```python
     import pulsar
@@ -157,23 +141,20 @@ Here is an example of using the Kafka source connector with the configuration fi
     client.close()
     ```
 
-8. Copy the following files to Pulsar.
-   
+5. Copy the following files to Pulsar.
+  
     ```bash
-    $ docker cp pulsar-io-kafka-{{pulsar:version}}.nar pulsar-kafka-standalone:/pulsar
-    $ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
-    $ docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/
-    $ docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/
+    docker cp pulsar-io-kafka.nar pulsar-kafka-standalone:/pulsar
+    docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
     ```
 
-9. Open a new terminal window and start the Kafka source connector in local run mode. 
+6. Open a new terminal window and start the Kafka source connector in local run mode.
 
     ```bash
-    $ docker exec -it pulsar-kafka-standalone /bin/bash
+    docker exec -it pulsar-kafka-standalone /bin/bash
 
-    $ ./bin/pulsar-admin source localrun \
-    --archive ./pulsar-io-kafka-{{pulsar:version}}.nar \
-    --classname org.apache.pulsar.io.kafka.KafkaBytesSource \
+    ./bin/pulsar-admin source localrun \
+    --archive ./pulsar-io-kafka.nar \
     --tenant public \
     --namespace default \
     --name kafka \
@@ -182,18 +163,49 @@ Here is an example of using the Kafka source connector with the configuration fi
     --parallelism 1
     ```
 
-10. Open a new terminal window and run the consumer.
+7. Open a new terminal window and run the Kafka producer locally.
 
     ```bash
-    $ docker exec -it pulsar-kafka-standalone /bin/bash
+    python3 kafka-producer.py
+    ```
 
-    $ pip install kafka-python
+8. Open a new terminal window and run the Pulsar consumer locally.
 
-    $ python3 kafka-producer.py
+    ```bash
+    python3 pulsar-client.py
     ```
 
-    The following information appears on the consumer terminal window.
+The following information appears on the consumer terminal window.
 
     ```bash
     Received message: 'hello world'
     ```
+
+### On-premises cluster
+
+This example explains how to create a Kafka source connector in an on-premises cluster.
+
+1. Copy the NAR package of the Kafka connector to the Pulsar connectors directory.
+
+    ```
+    cp pulsar-io-kafka-{{connector:version}}.nar $PULSAR_HOME/connectors/pulsar-io-kafka-{{connector:version}}.nar
+    ```
+
+2. Reload all [built-in connectors](https://pulsar.apache.org/docs/en/next/io-connectors/).
+
+    ```
+    PULSAR_HOME/bin/pulsar-admin sources reload
+    ```
+
+3. Check whether the Kafka source connector is available on the list or not.
+
+    ```
+    PULSAR_HOME/bin/pulsar-admin sources available-sources
+    ```
+
+4. Create a Kafka source connector on a Pulsar cluster using the [`pulsar-admin sources create`](http://pulsar.apache.org/tools/pulsar-admin/2.9.0-SNAPSHOT/#-em-create-em--14) command.
+
+    ```
+    PULSAR_HOME/bin/pulsar-admin sources create \
+    --source-config-file <kafka-source-config.yaml>
+    ```
\ No newline at end of file