You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/20 08:24:28 UTC
[GitHub] [pulsar] gvolpe created a discussion: [Pulsar 2.10.0] - Unable to get Debezium PostgreSQL working
GitHub user gvolpe created a discussion: [Pulsar 2.10.0] - Unable to get Debezium PostgreSQL working
Hi,
I've followed the [documentation](https://pulsar.apache.org/docs/io-cdc-debezium/) trying to use my own database and tables, but I'm unable to get this working.
### Docker Compose
```yaml
version: '3.6'
networks:
app: {}
services:
postgres:
restart: always
image: debezium/postgres:14-alpine
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=trading
ports:
- 5432:5432
networks:
- app
volumes:
- ./modules/forecasts/src/main/resources/db/migration/V1__baseline.sql:/docker-entrypoint-initdb.d/init.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d trading"]
interval: 5s
timeout: 20s
retries: 15
pulsar:
restart: always
image: apachepulsar/pulsar:2.10.0
ports:
- 6650:6650
- 8080:8080
command: >
/bin/bash -c "bin/pulsar standalone"
networks:
- app
volumes:
- ./pulsarconf/standalone.conf:/pulsar/conf/standalone.conf
- ./pulsarconf/connectors/pulsar-io-debezium-postgres-2.10.1.nar:/pulsar/connectors/debezium-2.10.1.nar
- ./pulsarconf/debezium-postgres-config.yaml:/pulsar/conf/debezium-pg.yaml
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/admin/v2/brokers/health"]
interval: 2s
timeout: 5s
retries: 15
start_period: 2m
```
### Debezium Postgres config
I've noticed that `database.dbname` only comes up in the Postgres example, but it is not listed in the configuration properties' table at the top of the documentation, but if I fail to provide it, the connector would fail to start. So I guess this is a documentation hiccup?
```yaml
tenant: "public"
namespace: "default"
name: "debezium-postgres-source"
topicName: "debezium-postgres-topic"
archive: "connectors/debezium-2.10.1.nar"
parallelism: 1
configs:
database.hostname: "postgres"
database.port: "5432"
database.user: "postgres"
database.password: "postgres"
database.dbname: "trading"
database.server.name: "dbserver"
#plugin.name: "wal2json"
plugin.name: "pgoutput"
schema.whitelist: "trading"
database.whitelist: "trading"
table.whitelist: "trading.authors"
pulsar.service.url: "pulsar://pulsar:6650"
database.history.pulsar.service.url: "pulsar://pulsar:6650"
```
### Pulsar Debezium Connector
Once Postgres and Pulsar are up, I run the following command to start the Pulsar Debezium connector.
```console
$ docker-compose exec -T pulsar bin/pulsar-admin source localrun --source-config-file /pulsar/conf/debezium-pg.yaml
...
2022-09-20T08:10:22,579+0000 [pool-3-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:10:22,604+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = dbserver named = keep-alive
2022-09-20T08:10:22,604+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-dbserver-keep-alive
2022-09-20T08:10:22,607+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Searching for WAL resume position
```
The log shows the last lines of output. Once there is some activity in my database (e.g. insert or update), I see this log only for the first operation. Then nothing else happens, no more logs, even if I continue to insert records.
```console
2022-09-20T08:11:18,247+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - First LSN 'LSN{0/1886078}' received
2022-09-20T08:11:18,247+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - WAL resume position 'LSN{0/1886078}' discovered
2022-09-20T08:11:18,248+0000 [pool-4-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:11:18,249+0000 [pool-5-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:11:18,256+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.PostgresReplicationConnection - Initializing PgOutput logical decoder publication
2022-09-20T08:11:18,268+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = dbserver named = keep-alive
2022-09-20T08:11:18,268+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-dbserver-keep-alive
2022-09-20T08:11:18,269+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Processing messages
2022-09-20T08:11:18,775+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - Message with LSN 'LSN{0/1886078}' arrived, switching off the filtering
```
### PostgreSQL
On the Postgres side, everything seems alright. I'm able to create and update records.
```console
$ select * from authors;
id | name | website
--------------------------------------+--------+---------
37d58e4c-8379-4fac-beb4-88f5d99adac5 | gvolpe | gvolpe.com
d35b1167-8442-4dc3-bef4-589f36b982d7 | jdoe |
(2 rows)
```
### Consumer
The consumer doesn't get any events. This is the full log.
```console
$ docker-compose exec pulsar bin/pulsar-client consume -s "cdc-authors" persistent://public/default/dbserver.trading.authors
2022-09-20T08:11:38,357+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xd4bdab64, L:/127.0.0.1:59816 - R:localhost/127.0.0.1:6650]] Connected to server
2022-09-20T08:11:38,457+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":["persistent://public/default/dbserver.trading.authors"],"topicsPattern":null,"subscriptionName":"cdc-authors","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":null,"ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"PersistentOnly","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartit
ions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"keySharedPolicy":null,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":true,"startPaused":false,"maxPendingChuckedMessage":10}
2022-09-20T08:11:38,470+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"","tlsCiphers":[],"tlsProtocols
":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
2022-09-20T08:11:38,479+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/dbserver.trading.authors][cdc-authors] Subscribing to topic on cnx [id: 0xd4bdab64, L:/127.0.0.1:59816 - R:localhost/127.0.0.1:6650], consumerId 0
2022-09-20T08:11:38,542+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/dbserver.trading.authors][cdc-authors] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
```
---
Has anyone out there been able to get this working with Pulsar 2.10.0 or newer?
Would appreciate any help, thanks!
GitHub link: https://github.com/apache/pulsar/discussions/17735
----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org
[GitHub] [pulsar] gvolpe added a comment to the discussion: [Pulsar 2.10.1] - Unable to get Debezium PostgreSQL working
Posted by GitBox <gi...@apache.org>.
GitHub user gvolpe added a comment to the discussion: [Pulsar 2.10.1] - Unable to get Debezium PostgreSQL working
Ok, managed to get this working with the following configuration:
```yaml
tenant: "public"
namespace: "default"
name: "debezium-postgres-source"
topicName: "debezium-postgres-topic"
archive: "connectors/debezium-2.10.1.nar"
parallelism: 1
configs:
database.hostname: "postgres"
database.port: "5432"
database.user: "postgres"
database.password: "postgres"
database.dbname: "trading"
database.server.name: "dbserver"
plugin.name: "pgoutput"
schema.whitelist: "public"
table.whitelist: "public.authors"
pulsar.service.url: "pulsar://pulsar:6650"
database.history.pulsar.service.url: "pulsar://pulsar:6650"
```
So it doesn't matter if one sets `database.whitelist: "my-db"`, it seems the `schema.whitelist` is mandatory.
GitHub link: https://github.com/apache/pulsar/discussions/17735#discussioncomment-3689166
----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org
[GitHub] [pulsar] gvolpe edited a discussion: [Pulsar 2.10.1] - Unable to get Debezium PostgreSQL working
Posted by GitBox <gi...@apache.org>.
GitHub user gvolpe edited a discussion: [Pulsar 2.10.1] - Unable to get Debezium PostgreSQL working
Hi,
I've followed the [documentation](https://pulsar.apache.org/docs/io-cdc-debezium/) trying to use my own database and tables, but I'm unable to get this working.
### Docker Compose
```yaml
version: '3.6'
networks:
app: {}
services:
postgres:
restart: always
image: debezium/postgres:14-alpine
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=trading
ports:
- 5432:5432
networks:
- app
volumes:
- ./modules/forecasts/src/main/resources/db/migration/V1__baseline.sql:/docker-entrypoint-initdb.d/init.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d trading"]
interval: 5s
timeout: 20s
retries: 15
pulsar:
restart: always
image: apachepulsar/pulsar:2.10.1
ports:
- 6650:6650
- 8080:8080
command: >
/bin/bash -c "bin/pulsar standalone"
networks:
- app
volumes:
- ./pulsarconf/standalone.conf:/pulsar/conf/standalone.conf
- ./pulsarconf/connectors/pulsar-io-debezium-postgres-2.10.1.nar:/pulsar/connectors/debezium-2.10.1.nar
- ./pulsarconf/debezium-postgres-config.yaml:/pulsar/conf/debezium-pg.yaml
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/admin/v2/brokers/health"]
interval: 2s
timeout: 5s
retries: 15
start_period: 2m
```
### Debezium Postgres config
I've noticed that `database.dbname` only comes up in the Postgres example, but it is not listed in the configuration properties' table at the top of the documentation, but if I fail to provide it, the connector would fail to start. So I guess this is a documentation hiccup?
```yaml
tenant: "public"
namespace: "default"
name: "debezium-postgres-source"
topicName: "debezium-postgres-topic"
archive: "connectors/debezium-2.10.1.nar"
parallelism: 1
configs:
database.hostname: "postgres"
database.port: "5432"
database.user: "postgres"
database.password: "postgres"
database.dbname: "trading"
database.server.name: "dbserver"
#plugin.name: "wal2json"
plugin.name: "pgoutput"
schema.whitelist: "trading"
database.whitelist: "trading"
table.whitelist: "trading.authors"
pulsar.service.url: "pulsar://pulsar:6650"
database.history.pulsar.service.url: "pulsar://pulsar:6650"
```
### Pulsar Debezium Connector
Once Postgres and Pulsar are up, I run the following command to start the Pulsar Debezium connector.
```console
$ docker-compose exec -T pulsar bin/pulsar-admin source localrun --source-config-file /pulsar/conf/debezium-pg.yaml
...
2022-09-20T08:10:22,579+0000 [pool-3-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:10:22,604+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = dbserver named = keep-alive
2022-09-20T08:10:22,604+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-dbserver-keep-alive
2022-09-20T08:10:22,607+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Searching for WAL resume position
```
The log shows the last lines of output. Once there is some activity in my database (e.g. insert or update), I see this log only for the first operation. Then nothing else happens, no more logs, even if I continue to insert records.
```console
2022-09-20T08:11:18,247+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - First LSN 'LSN{0/1886078}' received
2022-09-20T08:11:18,247+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - WAL resume position 'LSN{0/1886078}' discovered
2022-09-20T08:11:18,248+0000 [pool-4-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:11:18,249+0000 [pool-5-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:11:18,256+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.PostgresReplicationConnection - Initializing PgOutput logical decoder publication
2022-09-20T08:11:18,268+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = dbserver named = keep-alive
2022-09-20T08:11:18,268+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-dbserver-keep-alive
2022-09-20T08:11:18,269+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Processing messages
2022-09-20T08:11:18,775+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - Message with LSN 'LSN{0/1886078}' arrived, switching off the filtering
```
### PostgreSQL
On the Postgres side, everything seems alright. I'm able to create and update records.
```console
$ select * from authors;
id | name | website
--------------------------------------+--------+---------
37d58e4c-8379-4fac-beb4-88f5d99adac5 | gvolpe | gvolpe.com
d35b1167-8442-4dc3-bef4-589f36b982d7 | jdoe |
(2 rows)
```
### Consumer
The consumer doesn't get any events. This is the full log.
```console
$ docker-compose exec pulsar bin/pulsar-client consume -s "cdc-authors" persistent://public/default/dbserver.trading.authors
2022-09-20T08:11:38,357+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xd4bdab64, L:/127.0.0.1:59816 - R:localhost/127.0.0.1:6650]] Connected to server
2022-09-20T08:11:38,457+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":["persistent://public/default/dbserver.trading.authors"],"topicsPattern":null,"subscriptionName":"cdc-authors","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":null,"ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"PersistentOnly","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartit
ions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"keySharedPolicy":null,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":true,"startPaused":false,"maxPendingChuckedMessage":10}
2022-09-20T08:11:38,470+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"","tlsCiphers":[],"tlsProtocols
":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
2022-09-20T08:11:38,479+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/dbserver.trading.authors][cdc-authors] Subscribing to topic on cnx [id: 0xd4bdab64, L:/127.0.0.1:59816 - R:localhost/127.0.0.1:6650], consumerId 0
2022-09-20T08:11:38,542+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/dbserver.trading.authors][cdc-authors] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
```
### Topics
Before I run the consumer and after I produce SQL data, these are the only topics I see.
```console
$ docker-compose exec -T pulsar bin/pulsar-admin topics list public/default
persistent://public/default/__transaction_buffer_snapshot
persistent://public/default/__change_events
persistent://public/default/debezium-postgres-source-debezium-offset-topic
persistent://public/default/debezium-postgres-topic
```
I would expect to have `persistent://public/default/dbserver.trading.authors` as well, but I don't see any producer for this topic when running the Debezium connector.
---
Has anyone out there been able to get this working with Pulsar 2.10.1 or newer?
Would appreciate any help, thanks!
GitHub link: https://github.com/apache/pulsar/discussions/17735
----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org
[GitHub] [pulsar] gvolpe edited a discussion: [Pulsar 2.10.1] - Unable to get Debezium PostgreSQL working
Posted by GitBox <gi...@apache.org>.
GitHub user gvolpe edited a discussion: [Pulsar 2.10.1] - Unable to get Debezium PostgreSQL working
Hi,
I've followed the [documentation](https://pulsar.apache.org/docs/io-cdc-debezium/) trying to use my own database and tables, but I'm unable to get this working.
### Docker Compose
```yaml
version: '3.6'
networks:
app: {}
services:
postgres:
restart: always
image: debezium/postgres:14-alpine
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=trading
ports:
- 5432:5432
networks:
- app
volumes:
- ./modules/forecasts/src/main/resources/db/migration/V1__baseline.sql:/docker-entrypoint-initdb.d/init.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d trading"]
interval: 5s
timeout: 20s
retries: 15
pulsar:
restart: always
image: apachepulsar/pulsar:2.10.0
ports:
- 6650:6650
- 8080:8080
command: >
/bin/bash -c "bin/pulsar standalone"
networks:
- app
volumes:
- ./pulsarconf/standalone.conf:/pulsar/conf/standalone.conf
- ./pulsarconf/connectors/pulsar-io-debezium-postgres-2.10.1.nar:/pulsar/connectors/debezium-2.10.1.nar
- ./pulsarconf/debezium-postgres-config.yaml:/pulsar/conf/debezium-pg.yaml
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/admin/v2/brokers/health"]
interval: 2s
timeout: 5s
retries: 15
start_period: 2m
```
### Debezium Postgres config
I've noticed that `database.dbname` only comes up in the Postgres example, but it is not listed in the configuration properties' table at the top of the documentation, but if I fail to provide it, the connector would fail to start. So I guess this is a documentation hiccup?
```yaml
tenant: "public"
namespace: "default"
name: "debezium-postgres-source"
topicName: "debezium-postgres-topic"
archive: "connectors/debezium-2.10.1.nar"
parallelism: 1
configs:
database.hostname: "postgres"
database.port: "5432"
database.user: "postgres"
database.password: "postgres"
database.dbname: "trading"
database.server.name: "dbserver"
#plugin.name: "wal2json"
plugin.name: "pgoutput"
schema.whitelist: "trading"
database.whitelist: "trading"
table.whitelist: "trading.authors"
pulsar.service.url: "pulsar://pulsar:6650"
database.history.pulsar.service.url: "pulsar://pulsar:6650"
```
### Pulsar Debezium Connector
Once Postgres and Pulsar are up, I run the following command to start the Pulsar Debezium connector.
```console
$ docker-compose exec -T pulsar bin/pulsar-admin source localrun --source-config-file /pulsar/conf/debezium-pg.yaml
...
2022-09-20T08:10:22,579+0000 [pool-3-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:10:22,604+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = dbserver named = keep-alive
2022-09-20T08:10:22,604+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-dbserver-keep-alive
2022-09-20T08:10:22,607+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Searching for WAL resume position
```
The log shows the last lines of output. Once there is some activity in my database (e.g. insert or update), I see this log only for the first operation. Then nothing else happens, no more logs, even if I continue to insert records.
```console
2022-09-20T08:11:18,247+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - First LSN 'LSN{0/1886078}' received
2022-09-20T08:11:18,247+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - WAL resume position 'LSN{0/1886078}' discovered
2022-09-20T08:11:18,248+0000 [pool-4-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:11:18,249+0000 [pool-5-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:11:18,256+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.PostgresReplicationConnection - Initializing PgOutput logical decoder publication
2022-09-20T08:11:18,268+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = dbserver named = keep-alive
2022-09-20T08:11:18,268+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-dbserver-keep-alive
2022-09-20T08:11:18,269+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Processing messages
2022-09-20T08:11:18,775+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - Message with LSN 'LSN{0/1886078}' arrived, switching off the filtering
```
### PostgreSQL
On the Postgres side, everything seems alright. I'm able to create and update records.
```console
$ select * from authors;
id | name | website
--------------------------------------+--------+---------
37d58e4c-8379-4fac-beb4-88f5d99adac5 | gvolpe | gvolpe.com
d35b1167-8442-4dc3-bef4-589f36b982d7 | jdoe |
(2 rows)
```
### Consumer
The consumer doesn't get any events. This is the full log.
```console
$ docker-compose exec pulsar bin/pulsar-client consume -s "cdc-authors" persistent://public/default/dbserver.trading.authors
2022-09-20T08:11:38,357+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xd4bdab64, L:/127.0.0.1:59816 - R:localhost/127.0.0.1:6650]] Connected to server
2022-09-20T08:11:38,457+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":["persistent://public/default/dbserver.trading.authors"],"topicsPattern":null,"subscriptionName":"cdc-authors","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":null,"ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"PersistentOnly","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartit
ions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"keySharedPolicy":null,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":true,"startPaused":false,"maxPendingChuckedMessage":10}
2022-09-20T08:11:38,470+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"","tlsCiphers":[],"tlsProtocols
":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
2022-09-20T08:11:38,479+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/dbserver.trading.authors][cdc-authors] Subscribing to topic on cnx [id: 0xd4bdab64, L:/127.0.0.1:59816 - R:localhost/127.0.0.1:6650], consumerId 0
2022-09-20T08:11:38,542+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/dbserver.trading.authors][cdc-authors] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
```
### Topics
Before I run the consumer and after I produce SQL data, these are the only topics I see.
```console
$ docker-compose exec -T pulsar bin/pulsar-admin topics list public/default
persistent://public/default/__transaction_buffer_snapshot
persistent://public/default/__change_events
persistent://public/default/debezium-postgres-source-debezium-offset-topic
persistent://public/default/debezium-postgres-topic
```
I would expect to have `persistent://public/default/dbserver.trading.authors` as well, but I don't see any producer for this topic when running the Debezium connector.
---
Has anyone out there been able to get this working with Pulsar 2.10.0 or newer?
Would appreciate any help, thanks!
GitHub link: https://github.com/apache/pulsar/discussions/17735
----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org
[GitHub] [pulsar] gvolpe edited a discussion: [Pulsar 2.10.0] - Unable to get Debezium PostgreSQL working
Posted by GitBox <gi...@apache.org>.
GitHub user gvolpe edited a discussion: [Pulsar 2.10.0] - Unable to get Debezium PostgreSQL working
Hi,
I've followed the [documentation](https://pulsar.apache.org/docs/io-cdc-debezium/) trying to use my own database and tables, but I'm unable to get this working.
### Docker Compose
```yaml
version: '3.6'
networks:
app: {}
services:
postgres:
restart: always
image: debezium/postgres:14-alpine
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=trading
ports:
- 5432:5432
networks:
- app
volumes:
- ./modules/forecasts/src/main/resources/db/migration/V1__baseline.sql:/docker-entrypoint-initdb.d/init.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d trading"]
interval: 5s
timeout: 20s
retries: 15
pulsar:
restart: always
image: apachepulsar/pulsar:2.10.0
ports:
- 6650:6650
- 8080:8080
command: >
/bin/bash -c "bin/pulsar standalone"
networks:
- app
volumes:
- ./pulsarconf/standalone.conf:/pulsar/conf/standalone.conf
- ./pulsarconf/connectors/pulsar-io-debezium-postgres-2.10.1.nar:/pulsar/connectors/debezium-2.10.1.nar
- ./pulsarconf/debezium-postgres-config.yaml:/pulsar/conf/debezium-pg.yaml
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/admin/v2/brokers/health"]
interval: 2s
timeout: 5s
retries: 15
start_period: 2m
```
### Debezium Postgres config
I've noticed that `database.dbname` only comes up in the Postgres example, but it is not listed in the configuration properties' table at the top of the documentation, but if I fail to provide it, the connector would fail to start. So I guess this is a documentation hiccup?
```yaml
tenant: "public"
namespace: "default"
name: "debezium-postgres-source"
topicName: "debezium-postgres-topic"
archive: "connectors/debezium-2.10.1.nar"
parallelism: 1
configs:
database.hostname: "postgres"
database.port: "5432"
database.user: "postgres"
database.password: "postgres"
database.dbname: "trading"
database.server.name: "dbserver"
#plugin.name: "wal2json"
plugin.name: "pgoutput"
schema.whitelist: "trading"
database.whitelist: "trading"
table.whitelist: "trading.authors"
pulsar.service.url: "pulsar://pulsar:6650"
database.history.pulsar.service.url: "pulsar://pulsar:6650"
```
### Pulsar Debezium Connector
Once Postgres and Pulsar are up, I run the following command to start the Pulsar Debezium connector.
```console
$ docker-compose exec -T pulsar bin/pulsar-admin source localrun --source-config-file /pulsar/conf/debezium-pg.yaml
...
2022-09-20T08:10:22,579+0000 [pool-3-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:10:22,604+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = dbserver named = keep-alive
2022-09-20T08:10:22,604+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-dbserver-keep-alive
2022-09-20T08:10:22,607+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Searching for WAL resume position
```
The log shows the last lines of output. Once there is some activity in my database (e.g. insert or update), I see this log only for the first operation. Then nothing else happens, no more logs, even if I continue to insert records.
```console
2022-09-20T08:11:18,247+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - First LSN 'LSN{0/1886078}' received
2022-09-20T08:11:18,247+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - WAL resume position 'LSN{0/1886078}' discovered
2022-09-20T08:11:18,248+0000 [pool-4-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:11:18,249+0000 [pool-5-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:11:18,256+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.PostgresReplicationConnection - Initializing PgOutput logical decoder publication
2022-09-20T08:11:18,268+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = dbserver named = keep-alive
2022-09-20T08:11:18,268+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-dbserver-keep-alive
2022-09-20T08:11:18,269+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Processing messages
2022-09-20T08:11:18,775+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - Message with LSN 'LSN{0/1886078}' arrived, switching off the filtering
```
### PostgreSQL
On the Postgres side, everything seems alright. I'm able to create and update records.
```console
$ select * from authors;
id | name | website
--------------------------------------+--------+---------
37d58e4c-8379-4fac-beb4-88f5d99adac5 | gvolpe | gvolpe.com
d35b1167-8442-4dc3-bef4-589f36b982d7 | jdoe |
(2 rows)
```
### Consumer
The consumer doesn't get any events. This is the full log.
```console
$ docker-compose exec pulsar bin/pulsar-client consume -s "cdc-authors" persistent://public/default/dbserver.trading.authors
2022-09-20T08:11:38,357+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xd4bdab64, L:/127.0.0.1:59816 - R:localhost/127.0.0.1:6650]] Connected to server
2022-09-20T08:11:38,457+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":["persistent://public/default/dbserver.trading.authors"],"topicsPattern":null,"subscriptionName":"cdc-authors","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":null,"ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"PersistentOnly","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartit
ions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"keySharedPolicy":null,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":true,"startPaused":false,"maxPendingChuckedMessage":10}
2022-09-20T08:11:38,470+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"","tlsCiphers":[],"tlsProtocols
":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
2022-09-20T08:11:38,479+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/dbserver.trading.authors][cdc-authors] Subscribing to topic on cnx [id: 0xd4bdab64, L:/127.0.0.1:59816 - R:localhost/127.0.0.1:6650], consumerId 0
2022-09-20T08:11:38,542+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/dbserver.trading.authors][cdc-authors] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
```
### Topics
Before I run the consumer and after I produce SQL data, these are the only topics I see.
```console
$ docker-compose exec -T pulsar bin/pulsar-admin topics list public/default
persistent://public/default/__transaction_buffer_snapshot
persistent://public/default/__change_events
persistent://public/default/debezium-postgres-source-debezium-offset-topic
persistent://public/default/debezium-postgres-topic
```
I would expect to have `persistent://public/default/dbserver.trading.authors` as well, but I don't see any producer for this topic when running the Debezium connector.
---
Has anyone out there been able to get this working with Pulsar 2.10.0 or newer?
Would appreciate any help, thanks!
GitHub link: https://github.com/apache/pulsar/discussions/17735
----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org