You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/06/05 06:03:34 UTC
[pulsar] branch master updated: [issue #4074][pulsar-io]Update
document of PostgreSQL for debezium (#4376)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ef834ae [issue #4074][pulsar-io]Update document of PostgreSQL for debezium (#4376)
ef834ae is described below
commit ef834ae8e60f0707ba10b2852d2e56a46d30a13f
Author: tuteng <eg...@gmail.com>
AuthorDate: Wed Jun 5 14:03:29 2019 +0800
[issue #4074][pulsar-io]Update document of PostgreSQL for debezium (#4376)
Fixes #4074
Master Issue: #4074
### Motivation
There is currently a lack of documentation on PostgreSQL. Therefore, some useful documents about PostgreSQL are added.
### Modifications
Update document of PostgreSQL for debezium
---
site2/docs/io-cdc-debezium.md | 250 +++++++++++++++++++++++++++++++-----------
1 file changed, 186 insertions(+), 64 deletions(-)
diff --git a/site2/docs/io-cdc-debezium.md b/site2/docs/io-cdc-debezium.md
index 413d3f9..573548b 100644
--- a/site2/docs/io-cdc-debezium.md
+++ b/site2/docs/io-cdc-debezium.md
@@ -26,55 +26,45 @@ The Configuration is mostly related to Debezium task config, besides this we sho
| `pulsar.service.url` | `true` | `null` | Pulsar cluster service url. |
| `offset.storage.topic` | `true` | `null` | Record the last committed offsets that the connector successfully completed. |
-### Configuration Example
+## Example of MySQL
-Here is a configuration Json example:
+We need to create a configuration file before using the Pulsar Debezium connector.
-```$json
+### Configuration
+
+Here is a JSON configuration example:
+
+```json
{
- "tenant": "public",
- "namespace": "default",
- "name": "debezium-kafka-source",
- "className": "org.apache.pulsar.io.kafka.connect.KafkaConnectSource" ,
- "topicName": "kafka-connect-topic",
- "configs":
- {
- "task.class": "io.debezium.connector.mysql.MySqlConnectorTask",
- "database.hostname": "localhost",
- "database.port": "3306",
- "database.user": "debezium",
- "database.password": "dbz",
- "database.server.id": "184054",
- "database.server.name": "dbserver1",
- "database.whitelist": "inventory",
- "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
- "database.history.pulsar.topic": "history-topic",
- "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650",
- "key.converter": "org.apache.kafka.connect.json.JsonConverter",
- "value.converter": "org.apache.kafka.connect.json.JsonConverter",
- "pulsar.service.url": "pulsar://127.0.0.1:6650",
- "offset.storage.topic": "offset-topic"
- },
- "archive": "connectors/pulsar-io-debezium-mysql-{{pulsar:version}}.nar"
+ "database.hostname": "localhost",
+ "database.port": "3306",
+ "database.user": "debezium",
+ "database.password": "dbz",
+ "database.server.id": "184054",
+ "database.server.name": "dbserver1",
+ "database.whitelist": "inventory",
+ "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory",
+ "database.history.pulsar.topic": "history-topic",
+ "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650",
+ "key.converter": "org.apache.kafka.connect.json.JsonConverter",
+ "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+ "pulsar.service.url": "pulsar://127.0.0.1:6650",
+ "offset.storage.topic": "offset-topic"
}
```
-You could also find the yaml example in this [file](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml), which has similar content below:
+Optionally, you can create a `debezium-mysql-source-config.yaml` file, and copy the [contents] (https://github.com/apache/pulsar/blob/master/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml) below to the `debezium-mysql-source-config.yaml` file.
```$yaml
-tenant: "public"
+tenant: "pubilc"
namespace: "default"
-name: "debezium-kafka-source"
-topicName: "kafka-connect-topic"
+name: "debezium-mysql-source"
+topicName: "debezium-mysql-topic"
archive: "connectors/pulsar-io-debezium-mysql-{{pulsar:version}}.nar"
-##autoAck: true
parallelism: 1
configs:
- ## sourceTask
- task.class: "io.debezium.connector.mysql.MySqlConnectorTask"
-
## config for mysql, docker image: debezium/example-mysql:0.8
database.hostname: "localhost"
database.port: "3306"
@@ -96,43 +86,175 @@ configs:
offset.storage.topic: "offset-topic"
```
-### Usage example
+### Usage
-Here is a simple example to store MySQL change data using above example config.
+This example shows how to store the data changes of a MySQL table using the configuration file in the example above.
-- Start a MySQL server with an example database, from which Debezium can capture changes.
-```$bash
- docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.8
-```
+1. Start a MySQL server with an example database, from which Debezium can capture changes.
-- Start a Pulsar service locally in standalone mode.
-```$bash
- bin/pulsar standalone
-```
+ ```$bash
+ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.8
+ ```
-- Start pulsar debezium connector, with local run mode, and using above yaml config file. Please make sure that the nar file is available as configured in path `connectors/pulsar-io-debezium-mysql-{{pulsar:version}}.nar`.
-```$bash
- bin/pulsar-admin sources localrun --sourceConfigFile debezium-mysql-source-config.yaml
-```
+2. Start a Pulsar service locally in standalone mode.
-- Subscribe the topic for table `inventory.products`.
-```
- bin/pulsar-client consume -s "sub-products" public/default/dbserver1.inventory.products -n 0
-```
+ ```$bash
+ bin/pulsar standalone
+ ```
-- start a MySQL cli docker connector, and use it we could change to the table `products` in MySQL server.
-```$bash
-$docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
-```
+3. Start pulsar debezium connector, with local run mode, and using above yaml config file. Please make sure that the nar file is available as configured in path `connectors/pulsar-io-debezium-mysql-{{pulsar:version}}.nar`.
+
+ ```$bash
+ bin/pulsar-admin source localrun --source-config-file debezium-mysql-source-config.yaml
+ ```
+
+ ```$bash
+ bin/pulsar-admin source localrun --archive connectors/pulsar-io-debezium-mysql-{{pulsar:version}}.nar --name debezium-mysql-source --destination-topic-name debezium-mysql-topic --tenant public --namespace default --source-config '{"database.hostname": "localhost","database.port": "3306","database.user": "debezium","database.password": "dbz","database.server.id": "184054","database.server.name": "dbserver1","database.whitelist": "inventory","database.history": "org.apache.pulsar.io.de [...]
+ ```
+
+4. Subscribe the topic for table `inventory.products`.
+
+ ```
+ bin/pulsar-client consume -s "sub-products" public/default/dbserver1.inventory.products -n 0
+ ```
-This command will pop out MySQL cli, in this cli, we could do a change in table products, use commands below to change the name of 2 items in table products:
+5. start a MySQL cli docker connector, and use it we could change to the table `products` in MySQL server.
+ ```$bash
+ $docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
+ ```
+
+6. This command will pop out MySQL cli, in this cli, we could do a change in table products, use commands below to change the name of 2 items in table products:
+
+ ```
+ mysql> use inventory;
+ mysql> show tables;
+ mysql> SELECT * FROM products ;
+ mysql> UPDATE products SET name='1111111111' WHERE id=101;
+ mysql> UPDATE products SET name='1111111111' WHERE id=107;
+ ```
+
+ In above subscribe topic terminal tab, we could find that 2 changes has been kept into products topic.
+
+## Example of PostgreSQL
+
+We need to create a configuration file before using the Pulsar Debezium connector.
+
+### Configuration
+
+
+Here is a JSON configuration example:
+
+```json
+{
+ "database.hostname": "localhost",
+ "database.port": "5432",
+ "database.user": "postgres",
+ "database.password": "postgres",
+ "database.dbname": "postgres",
+ "database.server.name": "dbserver1",
+ "schema.whitelist": "inventory",
+ "pulsar.service.url": "pulsar://127.0.0.1:6650"
+}
```
-mysql> use inventory;
-mysql> show tables;
-mysql> SELECT * FROM products ;
-mysql> UPDATE products SET name='1111111111' WHERE id=101;
-mysql> UPDATE products SET name='1111111111' WHERE id=107;
+
+
+Optionally, you can create a `debezium-postgres-source-config.yaml` file, and copy the [contents] (https://github.com/apache/pulsar/blob/master/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml) below to the`debezium-postgres-source-config.yaml` file.
+
+```yaml
+tenant: "public"
+namespace: "default"
+name: "debezium-postgres-source"
+topicName: "debezium-postgres-topic"
+archive: "connectors/pulsar-io-debezium-postgres-{{pulsar:version}}.nar"
+
+parallelism: 1
+
+configs:
+ ## config for pg, docker image: debezium/example-postgress:0.8
+ database.hostname: "localhost"
+ database.port: "5432"
+ database.user: "postgres"
+ database.password: "postgres"
+ database.dbname: "postgres"
+ database.server.name: "dbserver1"
+ schema.whitelist: "inventory"
+
+ ## PULSAR_SERVICE_URL_CONFIG
+ pulsar.service.url: "pulsar://127.0.0.1:6650"
```
-- In above subscribe topic terminal tab, we could find that 2 changes has been kept into products topic.
\ No newline at end of file
+### Usage
+
+This example shows how to store the data changes of a PostgreSQL table using the configuration file in the example above.
+
+
+1. Start a PostgreSQL server with an example database, from which Debezium can capture changes.
+
+ ```$bash
+ docker pull debezium/example-postgres:0.8
+ docker run -d -it --rm --name pulsar-postgresql -p 5432:5432 debezium/example-postgres:0.8
+ ```
+
+2. Start a Pulsar service locally in standalone mode.
+
+ ```$bash
+ bin/pulsar standalone
+ ```
+
+3. Start the Pulsar Debezium connector in local run mode and use the JSON or YAML configuration file in the example above. Make sure the nar file is available at `connectors/pulsar-io-debezium-postgres-{{pulsar:version}}.nar`.
+
+ ```$bash
+ bin/pulsar-admin source localrun --source-config-file debezium-postgres-source-config.yaml
+ ```
+
+Optionally, start Pulsar Debezium connector in local run mode and use the JSON config file in the example above.
+
+
+ ```$bash
+ bin/pulsar-admin source localrun --archive connectors/pulsar-io-debezium-postgres-{{pulsar:version}}.nar --name debezium-postgres-source --destination-topic-name debezium-postgres-topic --tenant public --namespace default --source-config '{"database.hostname": "localhost","database.port": "5432","database.user": "postgres","database.password": "postgres","database.dbname": "postgres","database.server.name": "dbserver1","schema.whitelist": "inventory","pulsar.service.url": "pulsar://1 [...]
+ ```
+
+
+4. PostgreSQL CLI appears after this command is executed. Use the commands below to update the `products` table.
+
+ ```bash
+ docker exec -it pulsar-postgresql /bin/bash
+ ```
+
+ ```
+ psql -U postgres postgres
+ postgres=# \c postgres;
+ You are now connected to database "postgres" as user "postgres".
+ postgres=# SET search_path TO inventory;
+ SET
+ postgres=# select * from products;
+ id | name | description | weight
+ -----+--------------------+---------------------------------------------------------+--------
+ 102 | car battery | 12V car battery | 8.1
+ 103 | 12-pack drill bits | 12-pack of drill bits with sizes ranging from #40 to #3 | 0.8
+ 104 | hammer | 12oz carpenter's hammer | 0.75
+ 105 | hammer | 14oz carpenter's hammer | 0.875
+ 106 | hammer | 16oz carpenter's hammer | 1
+ 107 | rocks | box of assorted rocks | 5.3
+ 108 | jacket | water resistent black wind breaker | 0.1
+ 109 | spare tire | 24 inch spare tire | 22.2
+ 101 | 1111111111 | Small 2-wheel scooter | 3.14
+ (9 rows)
+
+ postgres=# UPDATE products SET name='1111111111' WHERE id=107;
+ UPDATE 1
+ ```
+
+5. Subscribe the topic for the `inventory.products` table.
+
+ ```
+ bin/pulsar-client consume -s "sub-products" public/default/dbserver1.inventory.products -n 0
+ ```
+
+ At this time, you will receive the following information:
+
+ ```bash
+ ----- got message -----
+ {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.products.Key"},"payload":{"id":107}}�{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products [...]
+ ```