You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/20 10:15:57 UTC

[GitHub] [pulsar] gvolpe edited a discussion: [Pulsar 2.10.1] - Unable to get Debezium PostgreSQL working

GitHub user gvolpe edited a discussion: [Pulsar 2.10.1] - Unable to get Debezium PostgreSQL working

Hi,

I've followed the [documentation](https://pulsar.apache.org/docs/io-cdc-debezium/) trying to use my own database and tables, but I'm unable to get this working. 

### Docker Compose

```yaml
version: '3.6'

networks:
  app: {}

services:
  postgres:
    restart: always
    image: debezium/postgres:14-alpine
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=trading
    ports:
      - 5432:5432
    networks:
      - app
    volumes: 
      - ./modules/forecasts/src/main/resources/db/migration/V1__baseline.sql:/docker-entrypoint-initdb.d/init.sql
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres -d trading"]
      interval: 5s
      timeout: 20s
      retries: 15

  pulsar:
    restart: always
    image: apachepulsar/pulsar:2.10.0
    ports:
      - 6650:6650
      - 8080:8080
    command: >
      /bin/bash -c "bin/pulsar standalone"
    networks:
      - app
    volumes:
      - ./pulsarconf/standalone.conf:/pulsar/conf/standalone.conf
      - ./pulsarconf/connectors/pulsar-io-debezium-postgres-2.10.1.nar:/pulsar/connectors/debezium-2.10.1.nar
      - ./pulsarconf/debezium-postgres-config.yaml:/pulsar/conf/debezium-pg.yaml
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8080/admin/v2/brokers/health"]
      interval: 2s
      timeout: 5s
      retries: 15
      start_period: 2m
```

### Debezium Postgres config

I've noticed that `database.dbname` only comes up in the Postgres example, but it is not listed in the configuration properties' table at the top of the documentation, but if I fail to provide it, the connector would fail to start. So I guess this is a documentation hiccup? 

```yaml
tenant: "public"
namespace: "default"
name: "debezium-postgres-source"
topicName: "debezium-postgres-topic"
archive: "connectors/debezium-2.10.1.nar"
parallelism: 1

configs:
    database.hostname: "postgres"
    database.port: "5432"
    database.user: "postgres"
    database.password: "postgres"
    database.dbname: "trading"
    database.server.name: "dbserver"
    #plugin.name: "wal2json"
    plugin.name: "pgoutput"
    schema.whitelist: "trading"
    database.whitelist: "trading"
    table.whitelist: "trading.authors"
    pulsar.service.url: "pulsar://pulsar:6650"
    database.history.pulsar.service.url: "pulsar://pulsar:6650"
```

### Pulsar Debezium Connector 

Once Postgres and Pulsar are up, I run the following command to start the Pulsar Debezium connector. 

```console
$ docker-compose exec -T pulsar bin/pulsar-admin source localrun --source-config-file /pulsar/conf/debezium-pg.yaml
...
2022-09-20T08:10:22,579+0000 [pool-3-thread-1] INFO  io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:10:22,604+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO  io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = dbserver named = keep-alive
2022-09-20T08:10:22,604+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO  io.debezium.util.Threads - Creating thread debezium-postgresconnector-dbserver-keep-alive
2022-09-20T08:10:22,607+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO  io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Searching for WAL resume position
```

The log shows the last lines of output. Once there is some activity in my database (e.g. insert or update), I see this log only for the first operation. Then nothing else happens, no more logs, even if I continue to insert records.

```console
2022-09-20T08:11:18,247+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO  io.debezium.connector.postgresql.connection.WalPositionLocator - First LSN 'LSN{0/1886078}' received
2022-09-20T08:11:18,247+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO  io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - WAL resume position 'LSN{0/1886078}' discovered
2022-09-20T08:11:18,248+0000 [pool-4-thread-1] INFO  io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:11:18,249+0000 [pool-5-thread-1] INFO  io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:11:18,256+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO  io.debezium.connector.postgresql.connection.PostgresReplicationConnection - Initializing PgOutput logical decoder publication
2022-09-20T08:11:18,268+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO  io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = dbserver named = keep-alive
2022-09-20T08:11:18,268+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO  io.debezium.util.Threads - Creating thread debezium-postgresconnector-dbserver-keep-alive
2022-09-20T08:11:18,269+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO  io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Processing messages
2022-09-20T08:11:18,775+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO  io.debezium.connector.postgresql.connection.WalPositionLocator - Message with LSN 'LSN{0/1886078}' arrived, switching off the filtering
``` 

### PostgreSQL 

On the Postgres side, everything seems alright. I'm able to create and update records.

```console
$ select * from authors;
                  id                  |  name  | website
--------------------------------------+--------+---------
 37d58e4c-8379-4fac-beb4-88f5d99adac5 | gvolpe | gvolpe.com
 d35b1167-8442-4dc3-bef4-589f36b982d7 | jdoe |
(2 rows)
```

### Consumer

The consumer doesn't get any events. This is the full log.

```console
$ docker-compose exec pulsar bin/pulsar-client consume -s "cdc-authors" persistent://public/default/dbserver.trading.authors
2022-09-20T08:11:38,357+0000 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xd4bdab64, L:/127.0.0.1:59816 - R:localhost/127.0.0.1:6650]] Connected to server
2022-09-20T08:11:38,457+0000 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":["persistent://public/default/dbserver.trading.authors"],"topicsPattern":null,"subscriptionName":"cdc-authors","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":null,"ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"PersistentOnly","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartit
 ions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"keySharedPolicy":null,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":true,"startPaused":false,"maxPendingChuckedMessage":10}
2022-09-20T08:11:38,470+0000 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"","tlsCiphers":[],"tlsProtocols
 ":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
2022-09-20T08:11:38,479+0000 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/dbserver.trading.authors][cdc-authors] Subscribing to topic on cnx [id: 0xd4bdab64, L:/127.0.0.1:59816 - R:localhost/127.0.0.1:6650], consumerId 0
2022-09-20T08:11:38,542+0000 [pulsar-client-io-1-1] INFO  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/dbserver.trading.authors][cdc-authors] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0
```

### Topics

Before I run the consumer and after I produce SQL data, these are the only topics I see.

```console
$ docker-compose exec -T pulsar bin/pulsar-admin topics list public/default
persistent://public/default/__transaction_buffer_snapshot
persistent://public/default/__change_events
persistent://public/default/debezium-postgres-source-debezium-offset-topic
persistent://public/default/debezium-postgres-topic
```

I would expect to have `persistent://public/default/dbserver.trading.authors` as well, but I don't see any producer for this topic when running the Debezium connector.

---

Has anyone out there been able to get this working with Pulsar 2.10.0 or newer?

Would appreciate any help, thanks! 

GitHub link: https://github.com/apache/pulsar/discussions/17735

----
This is an automatically sent email for dev@pulsar.apache.org.
To unsubscribe, please send an email to: dev-unsubscribe@pulsar.apache.org