You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by hu...@apache.org on 2022/05/10 09:36:45 UTC
[pulsar] branch master updated: [Improve][Doc]Update Kafka source connector docs (#15495)
This is an automated email from the ASF dual-hosted git repository.
huanlimeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 89958018da5 [Improve][Doc]Update Kafka source connector docs (#15495)
89958018da5 is described below
commit 89958018da5311889215912ce72c5979675186e6
Author: Huanli Meng <48...@users.noreply.github.com>
AuthorDate: Tue May 10 17:36:39 2022 +0800
[Improve][Doc]Update Kafka source connector docs (#15495)
* Update kafka source connector docs
* Apply tech review comments
---
site2/docs/io-kafka-source.md | 152 +++++++++++----------
.../version-2.10.0/io-kafka-source.md | 152 +++++++++++----------
.../version-2.9.2/io-kafka-source.md | 150 ++++++++++----------
3 files changed, 243 insertions(+), 211 deletions(-)
diff --git a/site2/docs/io-kafka-source.md b/site2/docs/io-kafka-source.md
index 1bce28381d6..3b34b18127e 100644
--- a/site2/docs/io-kafka-source.md
+++ b/site2/docs/io-kafka-source.md
@@ -17,18 +17,18 @@ The configuration of the Kafka source connector has the following properties.
| Name | Type| Required | Default | Description
|------|----------|---------|-------------|-------------|
-| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
+| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
| `groupId` |String| true | " " (empty string) | A unique string that identifies the group of consumer processes to which this consumer belongs. |
| `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch response. |
| `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's offset is periodically committed in the background.<br/><br/> This committed offset is used when the process fails as the position from which a new consumer begins. |
| `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is set to true. |
| `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats to the consumer when using Kafka's group management facilities. <br/><br/>**Note: `heartbeatIntervalMs` must be smaller than `sessionTimeoutMs`**.|
| `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect consumer failures when using Kafka's group management facility. |
-| `topic` | String|true | " " (empty string)| The Kafka topic which sends messages to Pulsar. |
-| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. |
+| `topic` | String|true | " " (empty string)| The Kafka topic that sends messages to Pulsar. |
+| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. |
| `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringDeserializer | The deserializer class for Kafka consumers to deserialize keys.<br/> The deserializer is set by a specific implementation of [`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
| `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer class for Kafka consumers to deserialize values.
-| `autoOffsetReset` | String | false | "earliest" | The default offset reset policy. |
+| `autoOffsetReset` | String | false | earliest | The default offset reset policy. |
### Schema Management
@@ -64,83 +64,65 @@ If you want to access the raw key, you can use the `Message#getKeyBytes()` API.
Before using the Kafka source connector, you need to create a configuration file through one of the following methods.
-* JSON
+- JSON
- ```json
+ ```json
{
- "configs": {
- "bootstrapServers": "pulsar-kafka:9092",
- "groupId": "test-pulsar-io",
- "topic": "my-topic",
- "sessionTimeoutMs": "10000",
- "autoCommitEnabled": false
- }
+ "bootstrapServers": "pulsar-kafka:9092",
+ "groupId": "test-pulsar-io",
+ "topic": "my-topic",
+ "sessionTimeoutMs": "10000",
+ "autoCommitEnabled": false
}
- ```
+ ```
-* YAML
+- YAML
- ```yaml
+ ```yaml
configs:
- bootstrapServers: "pulsar-kafka:9092"
- groupId: "test-pulsar-io"
- topic: "my-topic"
- sessionTimeoutMs: "10000"
+ bootstrapServers: "pulsar-kafka:9092"
+ groupId: "test-pulsar-io"
+ topic: "my-topic"
+ sessionTimeoutMs: "10000"
autoCommitEnabled: false
```
## Usage
-Here is an example of using the Kafka source connector with the configuration file as shown previously.
+You can make the Kafka source connector as a Pulsar built-in connector and use it on a standalone cluster or an on-premises cluster.
-1. Download a Kafka client and a Kafka connector.
+### Standalone cluster
- ```bash
- $ wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar
+This example describes how to use the Kafka source connector to feed data from Kafka and write data to Pulsar topics in the standalone mode.
- $ wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/connectors/pulsar-io-kafka-2.4.0.nar
- ```
+#### Prerequisites
-2. Create a network.
-
- ```bash
- $ docker network create kafka-pulsar
- ```
+- Install [Docker](https://docs.docker.com/get-docker/)(Community Edition).
-3. Pull a ZooKeeper image and start ZooKeeper.
-
- ```bash
- $ docker pull wurstmeister/zookeeper
+#### Steps
- $ docker run -d -it -p 2181:2181 --name pulsar-kafka-zookeeper --network kafka-pulsar wurstmeister/zookeeper
- ```
+1. Download and start the Confluent Platform.
-4. Pull a Kafka image and start Kafka.
-
- ```bash
- $ docker pull wurstmeister/kafka:2.11-1.0.2
-
- $ docker run -d -it --network kafka-pulsar -p 6667:6667 -p 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=pulsar-kafka -e KAFKA_ZOOKEEPER_CONNECT=pulsar-kafka-zookeeper:2181 --name pulsar-kafka wurstmeister/kafka:2.11-1.0.2
- ```
+For details, see the [documentation](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) to install the Kafka service locally.
-5. Pull a Pulsar image and start Pulsar standalone.
-
- ```bash
- $ docker pull apachepulsar/pulsar:{{pulsar:version}}
-
- $ docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone
- ```
+2. Pull a Pulsar image and start Pulsar in standalone mode.
+
+ ```bash
+ docker pull apachepulsar/pulsar:latest
+
+ docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:latest bin/pulsar standalone
+ ```
+
+3. Create a producer file _kafka-producer.py_.
-6. Create a producer file _kafka-producer.py_.
-
```python
from kafka import KafkaProducer
- producer = KafkaProducer(bootstrap_servers='pulsar-kafka:9092')
+ producer = KafkaProducer(bootstrap_servers='localhost:9092')
future = producer.send('my-topic', b'hello world')
future.get()
```
-7. Create a consumer file _pulsar-client.py_.
+4. Create a consumer file _pulsar-client.py_.
```python
import pulsar
@@ -158,23 +140,20 @@ Here is an example of using the Kafka source connector with the configuration fi
client.close()
```
-8. Copy the following files to Pulsar.
-
+5. Copy the following files to Pulsar.
+
```bash
- $ docker cp pulsar-io-kafka-{{pulsar:version}}.nar pulsar-kafka-standalone:/pulsar
- $ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
- $ docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/
- $ docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/
+ docker cp pulsar-io-kafka.nar pulsar-kafka-standalone:/pulsar
+ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
```
-9. Open a new terminal window and start the Kafka source connector in local run mode.
+6. Open a new terminal window and start the Kafka source connector in local run mode.
```bash
- $ docker exec -it pulsar-kafka-standalone /bin/bash
+ docker exec -it pulsar-kafka-standalone /bin/bash
- $ ./bin/pulsar-admin source localrun \
- --archive ./pulsar-io-kafka-{{pulsar:version}}.nar \
- --classname org.apache.pulsar.io.kafka.KafkaBytesSource \
+ ./bin/pulsar-admin source localrun \
+ --archive ./pulsar-io-kafka.nar \
--tenant public \
--namespace default \
--name kafka \
@@ -183,18 +162,49 @@ Here is an example of using the Kafka source connector with the configuration fi
--parallelism 1
```
-10. Open a new terminal window and run the consumer.
+7. Open a new terminal window and run the Kafka producer locally.
```bash
- $ docker exec -it pulsar-kafka-standalone /bin/bash
+ python3 kafka-producer.py
+ ```
- $ pip install kafka-python
+8. Open a new terminal window and run the Pulsar consumer locally.
- $ python3 kafka-producer.py
+ ```bash
+ python3 pulsar-client.py
```
- The following information appears on the consumer terminal window.
+The following information appears on the consumer terminal window.
```bash
Received message: 'hello world'
```
+
+### On-premises cluster
+
+This example explains how to create a Kafka source connector in an on-premises cluster.
+
+1. Copy the NAR package of the Kafka connector to the Pulsar connectors directory.
+
+ ```
+ cp pulsar-io-kafka-{{connector:version}}.nar $PULSAR_HOME/connectors/pulsar-io-kafka-{{connector:version}}.nar
+ ```
+
+2. Reload all [built-in connectors](https://pulsar.apache.org/docs/en/next/io-connectors/).
+
+ ```
+ PULSAR_HOME/bin/pulsar-admin sources reload
+ ```
+
+3. Check whether the Kafka source connector is available on the list or not.
+
+ ```
+ PULSAR_HOME/bin/pulsar-admin sources available-sources
+ ```
+
+4. Create a Kafka source connector on a Pulsar cluster using the [`pulsar-admin sources create`](http://pulsar.apache.org/tools/pulsar-admin/2.11.0-SNAPSHOT/#-em-create-em--14) command.
+
+ ```
+ PULSAR_HOME/bin/pulsar-admin sources create \
+ --source-config-file <kafka-source-config.yaml>
+ ```
\ No newline at end of file
diff --git a/site2/website/versioned_docs/version-2.10.0/io-kafka-source.md b/site2/website/versioned_docs/version-2.10.0/io-kafka-source.md
index 21611696648..d0d779786f0 100644
--- a/site2/website/versioned_docs/version-2.10.0/io-kafka-source.md
+++ b/site2/website/versioned_docs/version-2.10.0/io-kafka-source.md
@@ -18,18 +18,18 @@ The configuration of the Kafka source connector has the following properties.
| Name | Type| Required | Default | Description
|------|----------|---------|-------------|-------------|
-| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
+| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
| `groupId` |String| true | " " (empty string) | A unique string that identifies the group of consumer processes to which this consumer belongs. |
| `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch response. |
| `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's offset is periodically committed in the background.<br/><br/> This committed offset is used when the process fails as the position from which a new consumer begins. |
| `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is set to true. |
| `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats to the consumer when using Kafka's group management facilities. <br/><br/>**Note: `heartbeatIntervalMs` must be smaller than `sessionTimeoutMs`**.|
| `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect consumer failures when using Kafka's group management facility. |
-| `topic` | String|true | " " (empty string)| The Kafka topic which sends messages to Pulsar. |
-| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. |
+| `topic` | String|true | " " (empty string)| The Kafka topic that sends messages to Pulsar. |
+| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. |
| `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringDeserializer | The deserializer class for Kafka consumers to deserialize keys.<br/> The deserializer is set by a specific implementation of [`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
| `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer class for Kafka consumers to deserialize values.
-| `autoOffsetReset` | String | false | "earliest" | The default offset reset policy. |
+| `autoOffsetReset` | String | false | earliest | The default offset reset policy. |
### Schema Management
@@ -65,83 +65,65 @@ If you want to access the raw key, you can use the `Message#getKeyBytes()` API.
Before using the Kafka source connector, you need to create a configuration file through one of the following methods.
-* JSON
+- JSON
- ```json
+ ```json
{
- "configs": {
- "bootstrapServers": "pulsar-kafka:9092",
- "groupId": "test-pulsar-io",
- "topic": "my-topic",
- "sessionTimeoutMs": "10000",
- "autoCommitEnabled": false
- }
+ "bootstrapServers": "pulsar-kafka:9092",
+ "groupId": "test-pulsar-io",
+ "topic": "my-topic",
+ "sessionTimeoutMs": "10000",
+ "autoCommitEnabled": false
}
- ```
+ ```
-* YAML
+- YAML
- ```yaml
+ ```yaml
configs:
- bootstrapServers: "pulsar-kafka:9092"
- groupId: "test-pulsar-io"
- topic: "my-topic"
- sessionTimeoutMs: "10000"
+ bootstrapServers: "pulsar-kafka:9092"
+ groupId: "test-pulsar-io"
+ topic: "my-topic"
+ sessionTimeoutMs: "10000"
autoCommitEnabled: false
```
## Usage
-Here is an example of using the Kafka source connector with the configuration file as shown previously.
+You can make the Kafka source connector as a Pulsar built-in connector and use it on a standalone cluster or an on-premises cluster.
-1. Download a Kafka client and a Kafka connector.
+### Standalone cluster
- ```bash
- $ wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar
+This example describes how to use the Kafka source connector to feed data from Kafka and write data to Pulsar topics in the standalone mode.
- $ wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/connectors/pulsar-io-kafka-2.4.0.nar
- ```
+#### Prerequisites
-2. Create a network.
-
- ```bash
- $ docker network create kafka-pulsar
- ```
+- Install [Docker](https://docs.docker.com/get-docker/)(Community Edition).
-3. Pull a ZooKeeper image and start ZooKeeper.
-
- ```bash
- $ docker pull wurstmeister/zookeeper
+#### Steps
- $ docker run -d -it -p 2181:2181 --name pulsar-kafka-zookeeper --network kafka-pulsar wurstmeister/zookeeper
- ```
+1. Download and start the Confluent Platform.
-4. Pull a Kafka image and start Kafka.
-
- ```bash
- $ docker pull wurstmeister/kafka:2.11-1.0.2
-
- $ docker run -d -it --network kafka-pulsar -p 6667:6667 -p 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=pulsar-kafka -e KAFKA_ZOOKEEPER_CONNECT=pulsar-kafka-zookeeper:2181 --name pulsar-kafka wurstmeister/kafka:2.11-1.0.2
- ```
+For details, see the [documentation](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) to install the Kafka service locally.
-5. Pull a Pulsar image and start Pulsar standalone.
-
- ```bash
- $ docker pull apachepulsar/pulsar:{{pulsar:version}}
-
- $ docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone
- ```
+2. Pull a Pulsar image and start Pulsar in standalone mode.
+
+ ```bash
+ docker pull apachepulsar/pulsar:latest
+
+ docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:latest bin/pulsar standalone
+ ```
+
+3. Create a producer file _kafka-producer.py_.
-6. Create a producer file _kafka-producer.py_.
-
```python
from kafka import KafkaProducer
- producer = KafkaProducer(bootstrap_servers='pulsar-kafka:9092')
+ producer = KafkaProducer(bootstrap_servers='localhost:9092')
future = producer.send('my-topic', b'hello world')
future.get()
```
-7. Create a consumer file _pulsar-client.py_.
+4. Create a consumer file _pulsar-client.py_.
```python
import pulsar
@@ -159,23 +141,20 @@ Here is an example of using the Kafka source connector with the configuration fi
client.close()
```
-8. Copy the following files to Pulsar.
-
+5. Copy the following files to Pulsar.
+
```bash
- $ docker cp pulsar-io-kafka-{{pulsar:version}}.nar pulsar-kafka-standalone:/pulsar
- $ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
- $ docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/
- $ docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/
+ docker cp pulsar-io-kafka.nar pulsar-kafka-standalone:/pulsar
+ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
```
-9. Open a new terminal window and start the Kafka source connector in local run mode.
+6. Open a new terminal window and start the Kafka source connector in local run mode.
```bash
- $ docker exec -it pulsar-kafka-standalone /bin/bash
+ docker exec -it pulsar-kafka-standalone /bin/bash
- $ ./bin/pulsar-admin source localrun \
- --archive ./pulsar-io-kafka-{{pulsar:version}}.nar \
- --classname org.apache.pulsar.io.kafka.KafkaBytesSource \
+ ./bin/pulsar-admin source localrun \
+ --archive ./pulsar-io-kafka.nar \
--tenant public \
--namespace default \
--name kafka \
@@ -184,18 +163,49 @@ Here is an example of using the Kafka source connector with the configuration fi
--parallelism 1
```
-10. Open a new terminal window and run the consumer.
+7. Open a new terminal window and run the Kafka producer locally.
```bash
- $ docker exec -it pulsar-kafka-standalone /bin/bash
+ python3 kafka-producer.py
+ ```
- $ pip install kafka-python
+8. Open a new terminal window and run the Pulsar consumer locally.
- $ python3 kafka-producer.py
+ ```bash
+ python3 pulsar-client.py
```
- The following information appears on the consumer terminal window.
+The following information appears on the consumer terminal window.
```bash
Received message: 'hello world'
```
+
+### On-premises cluster
+
+This example explains how to create a Kafka source connector in an on-premises cluster.
+
+1. Copy the NAR package of the Kafka connector to the Pulsar connectors directory.
+
+ ```
+ cp pulsar-io-kafka-{{connector:version}}.nar $PULSAR_HOME/connectors/pulsar-io-kafka-{{connector:version}}.nar
+ ```
+
+2. Reload all [built-in connectors](https://pulsar.apache.org/docs/en/next/io-connectors/).
+
+ ```
+ PULSAR_HOME/bin/pulsar-admin sources reload
+ ```
+
+3. Check whether the Kafka source connector is available on the list or not.
+
+ ```
+ PULSAR_HOME/bin/pulsar-admin sources available-sources
+ ```
+
+4. Create a Kafka source connector on a Pulsar cluster using the [`pulsar-admin sources create`](http://pulsar.apache.org/tools/pulsar-admin/2.10.0-SNAPSHOT/#-em-create-em--14) command.
+
+ ```
+ PULSAR_HOME/bin/pulsar-admin sources create \
+ --source-config-file <kafka-source-config.yaml>
+ ```
\ No newline at end of file
diff --git a/site2/website/versioned_docs/version-2.9.2/io-kafka-source.md b/site2/website/versioned_docs/version-2.9.2/io-kafka-source.md
index 71122fdf930..926576d5aa7 100644
--- a/site2/website/versioned_docs/version-2.9.2/io-kafka-source.md
+++ b/site2/website/versioned_docs/version-2.9.2/io-kafka-source.md
@@ -18,18 +18,18 @@ The configuration of the Kafka source connector has the following properties.
| Name | Type| Required | Default | Description
|------|----------|---------|-------------|-------------|
-| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
+| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
| `groupId` |String| true | " " (empty string) | A unique string that identifies the group of consumer processes to which this consumer belongs. |
| `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch response. |
| `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's offset is periodically committed in the background.<br/><br/> This committed offset is used when the process fails as the position from which a new consumer begins. |
| `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is set to true. |
| `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats to the consumer when using Kafka's group management facilities. <br/><br/>**Note: `heartbeatIntervalMs` must be smaller than `sessionTimeoutMs`**.|
| `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect consumer failures when using Kafka's group management facility. |
-| `topic` | String|true | " " (empty string)| The Kafka topic which sends messages to Pulsar. |
-| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. |
+| `topic` | String|true | " " (empty string)| The Kafka topic that sends messages to Pulsar. |
+| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. |
| `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringDeserializer | The deserializer class for Kafka consumers to deserialize keys.<br/> The deserializer is set by a specific implementation of [`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
| `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer class for Kafka consumers to deserialize values.
-| `autoOffsetReset` | String | false | "earliest" | The default offset reset policy. |
+| `autoOffsetReset` | String | false | earliest | The default offset reset policy. |
### Schema Management
@@ -65,81 +65,65 @@ If you want to access the raw key, you can use the `Message#getKeyBytes()` API.
Before using the Kafka source connector, you need to create a configuration file through one of the following methods.
-* JSON
+- JSON
- ```json
+ ```json
{
- "bootstrapServers": "pulsar-kafka:9092",
- "groupId": "test-pulsar-io",
- "topic": "my-topic",
- "sessionTimeoutMs": "10000",
- "autoCommitEnabled": false
+ "bootstrapServers": "pulsar-kafka:9092",
+ "groupId": "test-pulsar-io",
+ "topic": "my-topic",
+ "sessionTimeoutMs": "10000",
+ "autoCommitEnabled": false
}
- ```
+ ```
-* YAML
+- YAML
- ```yaml
+ ```yaml
configs:
- bootstrapServers: "pulsar-kafka:9092"
- groupId: "test-pulsar-io"
- topic: "my-topic"
- sessionTimeoutMs: "10000"
+ bootstrapServers: "pulsar-kafka:9092"
+ groupId: "test-pulsar-io"
+ topic: "my-topic"
+ sessionTimeoutMs: "10000"
autoCommitEnabled: false
```
## Usage
-Here is an example of using the Kafka source connector with the configuration file as shown previously.
+You can make the Kafka source connector as a Pulsar built-in connector and use it on a standalone cluster or an on-premises cluster.
-1. Download a Kafka client and a Kafka connector.
+### Standalone cluster
- ```bash
- $ wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar
+This example describes how to use the Kafka source connector to feed data from Kafka and write data to Pulsar topics in the standalone mode.
- $ wget https://archive.apache.org/dist/pulsar/pulsar-2.4.0/connectors/pulsar-io-kafka-2.4.0.nar
- ```
+#### Prerequisites
-2. Create a network.
-
- ```bash
- $ docker network create kafka-pulsar
- ```
+- Install [Docker](https://docs.docker.com/get-docker/)(Community Edition).
-3. Pull a ZooKeeper image and start ZooKeeper.
-
- ```bash
- $ docker pull wurstmeister/zookeeper
+#### Steps
- $ docker run -d -it -p 2181:2181 --name pulsar-kafka-zookeeper --network kafka-pulsar wurstmeister/zookeeper
- ```
+1. Download and start the Confluent Platform.
-4. Pull a Kafka image and start Kafka.
-
- ```bash
- $ docker pull wurstmeister/kafka:2.11-1.0.2
-
- $ docker run -d -it --network kafka-pulsar -p 6667:6667 -p 9092:9092 -e KAFKA_ADVERTISED_HOST_NAME=pulsar-kafka -e KAFKA_ZOOKEEPER_CONNECT=pulsar-kafka-zookeeper:2181 --name pulsar-kafka wurstmeister/kafka:2.11-1.0.2
- ```
+For details, see the [documentation](https://docs.confluent.io/platform/current/quickstart/ce-docker-quickstart.html#step-1-download-and-start-cp) to install the Kafka service locally.
-5. Pull a Pulsar image and start Pulsar standalone.
-
- ```bash
- $ docker pull apachepulsar/pulsar:{{pulsar:version}}
-
- $ docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone
- ```
+2. Pull a Pulsar image and start Pulsar in standalone mode.
+
+ ```bash
+ docker pull apachepulsar/pulsar:latest
+
+ docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:latest bin/pulsar standalone
+ ```
+
+3. Create a producer file _kafka-producer.py_.
-6. Create a producer file _kafka-producer.py_.
-
```python
from kafka import KafkaProducer
- producer = KafkaProducer(bootstrap_servers='pulsar-kafka:9092')
+ producer = KafkaProducer(bootstrap_servers='localhost:9092')
future = producer.send('my-topic', b'hello world')
future.get()
```
-7. Create a consumer file _pulsar-client.py_.
+4. Create a consumer file _pulsar-client.py_.
```python
import pulsar
@@ -157,23 +141,20 @@ Here is an example of using the Kafka source connector with the configuration fi
client.close()
```
-8. Copy the following files to Pulsar.
-
+5. Copy the following files to Pulsar.
+
```bash
- $ docker cp pulsar-io-kafka-{{pulsar:version}}.nar pulsar-kafka-standalone:/pulsar
- $ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
- $ docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/
- $ docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/
+ docker cp pulsar-io-kafka.nar pulsar-kafka-standalone:/pulsar
+ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
```
-9. Open a new terminal window and start the Kafka source connector in local run mode.
+6. Open a new terminal window and start the Kafka source connector in local run mode.
```bash
- $ docker exec -it pulsar-kafka-standalone /bin/bash
+ docker exec -it pulsar-kafka-standalone /bin/bash
- $ ./bin/pulsar-admin source localrun \
- --archive ./pulsar-io-kafka-{{pulsar:version}}.nar \
- --classname org.apache.pulsar.io.kafka.KafkaBytesSource \
+ ./bin/pulsar-admin source localrun \
+ --archive ./pulsar-io-kafka.nar \
--tenant public \
--namespace default \
--name kafka \
@@ -182,18 +163,49 @@ Here is an example of using the Kafka source connector with the configuration fi
--parallelism 1
```
-10. Open a new terminal window and run the consumer.
+7. Open a new terminal window and run the Kafka producer locally.
```bash
- $ docker exec -it pulsar-kafka-standalone /bin/bash
+ python3 kafka-producer.py
+ ```
- $ pip install kafka-python
+8. Open a new terminal window and run the Pulsar consumer locally.
- $ python3 kafka-producer.py
+ ```bash
+ python3 pulsar-client.py
```
- The following information appears on the consumer terminal window.
+The following information appears on the consumer terminal window.
```bash
Received message: 'hello world'
```
+
+### On-premises cluster
+
+This example explains how to create a Kafka source connector in an on-premises cluster.
+
+1. Copy the NAR package of the Kafka connector to the Pulsar connectors directory.
+
+ ```
+ cp pulsar-io-kafka-{{connector:version}}.nar $PULSAR_HOME/connectors/pulsar-io-kafka-{{connector:version}}.nar
+ ```
+
+2. Reload all [built-in connectors](https://pulsar.apache.org/docs/en/next/io-connectors/).
+
+ ```
+ PULSAR_HOME/bin/pulsar-admin sources reload
+ ```
+
+3. Check whether the Kafka source connector is available on the list or not.
+
+ ```
+ PULSAR_HOME/bin/pulsar-admin sources available-sources
+ ```
+
+4. Create a Kafka source connector on a Pulsar cluster using the [`pulsar-admin sources create`](http://pulsar.apache.org/tools/pulsar-admin/2.9.0-SNAPSHOT/#-em-create-em--14) command.
+
+ ```
+ PULSAR_HOME/bin/pulsar-admin sources create \
+ --source-config-file <kafka-source-config.yaml>
+ ```
\ No newline at end of file