You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/07/06 06:25:33 UTC

[camel] branch main updated: git commit "CAMEL-16774:camel-debezium - Upgrade to 1.6" (#5796)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 317ab1f  git commit "CAMEL-16774:camel-debezium - Upgrade to 1.6" (#5796)
317ab1f is described below

commit 317ab1f872b6102adb12d45f2c9849f43be2b57c
Author: Ramu <kr...@gmail.com>
AuthorDate: Tue Jul 6 11:54:34 2021 +0530

    git commit "CAMEL-16774:camel-debezium - Upgrade to 1.6" (#5796)
    
    Co-authored-by: Kodanda Ramu Kakarla <kk...@kkakarla.pnq.csb>
---
 camel-dependencies/pom.xml                         |   4 +-
 .../catalog/docs/debezium-mysql-component.adoc     |  16 +-
 .../catalog/docs/debezium-postgres-component.adoc  |  14 +-
 .../catalog/docs/debezium-sqlserver-component.adoc |  20 ++-
 .../debezium/DebeziumMySqlComponentConfigurer.java |  31 +++-
 .../debezium/DebeziumMySqlEndpointConfigurer.java  |  31 +++-
 .../debezium/DebeziumMySqlEndpointUriFactory.java  |   6 +-
 ...ySqlConnectorEmbeddedDebeziumConfiguration.java |  70 ++++++++-
 .../camel/component/debezium/debezium-mysql.json   |  12 +-
 .../src/main/docs/debezium-mysql-component.adoc    |  16 +-
 .../DebeziumPostgresComponentConfigurer.java       |  13 +-
 .../DebeziumPostgresEndpointConfigurer.java        |  13 +-
 .../DebeziumPostgresEndpointUriFactory.java        |   3 +-
 ...gresConnectorEmbeddedDebeziumConfiguration.java |  21 ++-
 .../component/debezium/debezium-postgres.json      |  10 +-
 .../src/main/docs/debezium-postgres-component.adoc |  14 +-
 .../DebeziumSqlserverComponentConfigurer.java      |  31 +++-
 .../DebeziumSqlserverEndpointConfigurer.java       |  31 +++-
 .../DebeziumSqlserverEndpointUriFactory.java       |   6 +-
 ...rverConnectorEmbeddedDebeziumConfiguration.java |  75 +++++++++-
 .../component/debezium/debezium-sqlserver.json     |  16 +-
 .../main/docs/debezium-sqlserver-component.adoc    |  20 ++-
 .../dsl/DebeziumMysqlComponentBuilderFactory.java  |  80 +++++++++-
 .../DebeziumPostgresComponentBuilderFactory.java   |  24 ++-
 .../DebeziumSqlserverComponentBuilderFactory.java  |  86 ++++++++++-
 .../dsl/DebeziumMySqlEndpointBuilderFactory.java   | 153 +++++++++++++++++++-
 .../DebeziumPostgresEndpointBuilderFactory.java    |  39 ++++-
 .../DebeziumSqlserverEndpointBuilderFactory.java   | 161 ++++++++++++++++++++-
 .../ROOT/pages/debezium-mysql-component.adoc       |  16 +-
 .../ROOT/pages/debezium-postgres-component.adoc    |  14 +-
 .../ROOT/pages/debezium-sqlserver-component.adoc   |  20 ++-
 parent/pom.xml                                     |   4 +-
 32 files changed, 945 insertions(+), 125 deletions(-)

diff --git a/camel-dependencies/pom.xml b/camel-dependencies/pom.xml
index 50016a1..faa33ae 100644
--- a/camel-dependencies/pom.xml
+++ b/camel-dependencies/pom.xml
@@ -160,8 +160,8 @@
     <cxf.codegenplugin.forkmode>once</cxf.codegenplugin.forkmode>
     <cxf.xjc.jvmArgs></cxf.xjc.jvmArgs>
     <datasonnet-mapper-version>2.1.2</datasonnet-mapper-version>
-    <debezium-mysql-connector-version>8.0.22</debezium-mysql-connector-version>
-    <debezium-version>1.5.2.Final</debezium-version>
+    <debezium-mysql-connector-version>8.0.25</debezium-mysql-connector-version>
+    <debezium-version>1.6.0.Final</debezium-version>
     <deltaspike-version>1.9.4</deltaspike-version>
     <depends-maven-plugin-version>1.4.0</depends-maven-plugin-version>
     <derby-version>10.14.2.0</derby-version>
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/debezium-mysql-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/debezium-mysql-component.adoc
index 7cc5b62..7d8e9a8 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/debezium-mysql-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/debezium-mysql-component.adoc
@@ -56,7 +56,7 @@ debezium-mysql:name[?options]
 
 
 // component options: START
-The Debezium MySQL Connector component supports 88 options, which are listed below.
+The Debezium MySQL Connector component supports 92 options, which are listed below.
 
 
 
@@ -95,6 +95,9 @@ The Debezium MySQL Connector component supports 88 options, which are listed bel
 | *databaseHistoryKafkaRecovery{zwsp}Attempts* (mysql) | The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms). | 100 | int
 | *databaseHistoryKafkaRecovery{zwsp}PollIntervalMs* (mysql) | The number of milliseconds to wait while polling for persisted data during recovery. | 100ms | int
 | *databaseHistoryKafkaTopic* (mysql) | The name of the topic for the database schema history |  | String
+| *databaseHistorySkipUnparseable{zwsp}Ddl* (mysql) | Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}CapturedTablesDdl* (mysql) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a captured table will be stored. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}MonitoredTablesDdl* (mysql) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a monitored table will be stored (deprecated, use database.history.store.only.captured.tables.ddl instead) | false | boolean
 | *databaseHostname* (mysql) | Resolvable hostname or IP address of the database server. |  | String
 | *databaseIncludeList* (mysql) | The databases for which changes are to be captured |  | String
 | *databaseInitialStatements* (mysql) | A semicolon separated list of SQL statements to be executed when a JDBC connection (not binlog reading connection) to the database is established. Note that the connector may establish JDBC connections at its own discretion, so this should typically be used for configuration of session parameters only,but not for executing DML statements. Use doubled semicolon (';;') to use a semicolon as a character and not as a delimiter. |  | String
@@ -123,6 +126,7 @@ The Debezium MySQL Connector component supports 88 options, which are listed bel
 | *includeQuery* (mysql) | Whether the connector should include the original SQL query that generated the change event. Note: This option requires MySQL be configured with the binlog_rows_query_log_events option set to ON. Query will not be present for events generated from snapshot. WARNING: Enabling this option may expose tables or fields explicitly blacklisted or masked by including the original SQL statement in the change event. For this reason the default value is 'false'. | false | [...]
 | *includeSchemaChanges* (mysql) | Whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change will be recorded using a key that contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s).The default is 'true'. This is independent of how the connector internally records database history. | true | boolean
 | *inconsistentSchemaHandlingMode* (mysql) | Specify how binlog events that belong to a table missing from internal schema representation (i.e. internal representation is not consistent with database) should be handled, including:'fail' (the default) an exception indicating the problematic event and its binlog position is raised, causing the connector to be stopped; 'warn' the problematic event and its binlog position will be logged and the event will be skipped;'skip' the problematic ev [...]
+| *incrementalSnapshotChunkSize* (mysql) | The maximum size of chunk for incremental snapshotting | 1024 | int
 | *maxBatchSize* (mysql) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
 | *maxQueueSize* (mysql) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
 | *maxQueueSizeInBytes* (mysql) | Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to 0. Mean the feature is not enabled | 0 | long
@@ -142,7 +146,7 @@ The Debezium MySQL Connector component supports 88 options, which are listed bel
 | *snapshotMaxThreads* (mysql) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
 | *snapshotMode* (mysql) | The criteria for running a snapshot upon startup of the connector. Options include: 'when_needed' to specify that the connector run a snapshot upon startup whenever it deems it necessary; 'schema_only' to only take a snapshot of the schema (table structures) but no actual data; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector  [...]
 | *snapshotNewTables* (mysql) | BETA FEATURE: On connector restart, the connector will check if there have been any new tables added to the configuration, and snapshot them. There is presently only two options:'off': Default behavior. Do not snapshot new tables.'parallel': The snapshot of the new tables will occur in parallel to the continued binlog reading of the old tables. When the snapshot completes, an independent binlog reader will begin reading the events for the new tables until  [...]
-| *snapshotSelectStatement{zwsp}Overrides* (mysql) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The val [...]
+| *snapshotSelectStatement{zwsp}Overrides* (mysql) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The valu [...]
 | *sourceStructVersion* (mysql) | A version of the format of the publicly visible source part in the message | v2 | String
 | *tableBlacklist* (mysql) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
 | *tableExcludeList* (mysql) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring |  | String
@@ -174,7 +178,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (88 parameters):
+=== Query Parameters (92 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -212,6 +216,9 @@ with the following path and query parameters:
 | *databaseHistoryKafkaRecovery{zwsp}Attempts* (mysql) | The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms). | 100 | int
 | *databaseHistoryKafkaRecovery{zwsp}PollIntervalMs* (mysql) | The number of milliseconds to wait while polling for persisted data during recovery. | 100ms | int
 | *databaseHistoryKafkaTopic* (mysql) | The name of the topic for the database schema history |  | String
+| *databaseHistorySkipUnparseable{zwsp}Ddl* (mysql) | Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}CapturedTablesDdl* (mysql) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a captured table will be stored. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}MonitoredTablesDdl* (mysql) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a monitored table will be stored (deprecated, use database.history.store.only.captured.tables.ddl instead) | false | boolean
 | *databaseHostname* (mysql) | Resolvable hostname or IP address of the database server. |  | String
 | *databaseIncludeList* (mysql) | The databases for which changes are to be captured |  | String
 | *databaseInitialStatements* (mysql) | A semicolon separated list of SQL statements to be executed when a JDBC connection (not binlog reading connection) to the database is established. Note that the connector may establish JDBC connections at its own discretion, so this should typically be used for configuration of session parameters only,but not for executing DML statements. Use doubled semicolon (';;') to use a semicolon as a character and not as a delimiter. |  | String
@@ -240,6 +247,7 @@ with the following path and query parameters:
 | *includeQuery* (mysql) | Whether the connector should include the original SQL query that generated the change event. Note: This option requires MySQL be configured with the binlog_rows_query_log_events option set to ON. Query will not be present for events generated from snapshot. WARNING: Enabling this option may expose tables or fields explicitly blacklisted or masked by including the original SQL statement in the change event. For this reason the default value is 'false'. | false | [...]
 | *includeSchemaChanges* (mysql) | Whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change will be recorded using a key that contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s).The default is 'true'. This is independent of how the connector internally records database history. | true | boolean
 | *inconsistentSchemaHandlingMode* (mysql) | Specify how binlog events that belong to a table missing from internal schema representation (i.e. internal representation is not consistent with database) should be handled, including:'fail' (the default) an exception indicating the problematic event and its binlog position is raised, causing the connector to be stopped; 'warn' the problematic event and its binlog position will be logged and the event will be skipped;'skip' the problematic ev [...]
+| *incrementalSnapshotChunkSize* (mysql) | The maximum size of chunk for incremental snapshotting | 1024 | int
 | *maxBatchSize* (mysql) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
 | *maxQueueSize* (mysql) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
 | *maxQueueSizeInBytes* (mysql) | Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to 0. Mean the feature is not enabled | 0 | long
@@ -259,7 +267,7 @@ with the following path and query parameters:
 | *snapshotMaxThreads* (mysql) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
 | *snapshotMode* (mysql) | The criteria for running a snapshot upon startup of the connector. Options include: 'when_needed' to specify that the connector run a snapshot upon startup whenever it deems it necessary; 'schema_only' to only take a snapshot of the schema (table structures) but no actual data; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector  [...]
 | *snapshotNewTables* (mysql) | BETA FEATURE: On connector restart, the connector will check if there have been any new tables added to the configuration, and snapshot them. There is presently only two options:'off': Default behavior. Do not snapshot new tables.'parallel': The snapshot of the new tables will occur in parallel to the continued binlog reading of the old tables. When the snapshot completes, an independent binlog reader will begin reading the events for the new tables until  [...]
-| *snapshotSelectStatement{zwsp}Overrides* (mysql) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The val [...]
+| *snapshotSelectStatement{zwsp}Overrides* (mysql) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The valu [...]
 | *sourceStructVersion* (mysql) | A version of the format of the publicly visible source part in the message | v2 | String
 | *tableBlacklist* (mysql) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
 | *tableExcludeList* (mysql) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring |  | String
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/debezium-postgres-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/debezium-postgres-component.adoc
index 3822935..6ba3026 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/debezium-postgres-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/debezium-postgres-component.adoc
@@ -47,7 +47,7 @@ debezium-postgres:name[?options]
 
 
 // component options: START
-The Debezium PostgresSQL Connector component supports 88 options, which are listed below.
+The Debezium PostgresSQL Connector component supports 89 options, which are listed below.
 
 
 
@@ -98,6 +98,7 @@ The Debezium PostgresSQL Connector component supports 88 options, which are list
 | *heartbeatTopicsPrefix* (postgres) | The prefix that is used to name heartbeat topics.Defaults to __debezium-heartbeat. | __debezium-heartbeat | String
 | *hstoreHandlingMode* (postgres) | Specify how HSTORE columns should be represented in change events, including:'json' represents values as string-ified JSON (default)'map' represents values as a key/value map | json | String
 | *includeUnknownDatatypes* (postgres) | Specify whether the fields of data type not supported by Debezium should be processed:'false' (the default) omits the fields; 'true' converts the field into an implementation dependent binary representation. | false | boolean
+| *incrementalSnapshotChunkSize* (postgres) | The maximum size of chunk for incremental snapshotting | 1024 | int
 | *intervalHandlingMode* (postgres) | Specify how INTERVAL columns should be represented in change events, including:'string' represents values as an exact ISO formatted string'numeric' (default) represents values using the inexact conversion into microseconds | numeric | String
 | *maxBatchSize* (postgres) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
 | *maxQueueSize* (postgres) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
@@ -129,8 +130,8 @@ The Debezium PostgresSQL Connector component supports 88 options, which are list
 | *snapshotIncludeCollectionList* (postgres) | this setting must be set to specify a list of tables/collections whose snapshot must be taken on creating or restarting the connector. |  | String
 | *snapshotLockTimeoutMs* (postgres) | The maximum number of millis to wait for table locks at the beginning of a snapshot. If locks cannot be acquired in this time frame, the snapshot will be aborted. Defaults to 10 seconds | 10s | long
 | *snapshotMaxThreads* (postgres) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
-| *snapshotMode* (postgres) | The criteria for running a snapshot upon startup of the connector. Options include: 'always' to specify that the connector run a snapshot each time it starts up; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes;'never' to specify t [...]
-| *snapshotSelectStatement{zwsp}Overrides* (postgres) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The  [...]
+| *snapshotMode* (postgres) | The criteria for running a snapshot upon startup of the connector. Options include: 'always' to specify that the connector run a snapshot each time it starts up; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes;'never' to specify t [...]
+| *snapshotSelectStatement{zwsp}Overrides* (postgres) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The v [...]
 | *sourceStructVersion* (postgres) | A version of the format of the publicly visible source part in the message | v2 | String
 | *statusUpdateIntervalMs* (postgres) | Frequency for sending replication connection status updates to the server, given in milliseconds. Defaults to 10 seconds (10,000 ms). | 10s | int
 | *tableBlacklist* (postgres) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
@@ -165,7 +166,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (88 parameters):
+=== Query Parameters (89 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -215,6 +216,7 @@ with the following path and query parameters:
 | *heartbeatTopicsPrefix* (postgres) | The prefix that is used to name heartbeat topics.Defaults to __debezium-heartbeat. | __debezium-heartbeat | String
 | *hstoreHandlingMode* (postgres) | Specify how HSTORE columns should be represented in change events, including:'json' represents values as string-ified JSON (default)'map' represents values as a key/value map | json | String
 | *includeUnknownDatatypes* (postgres) | Specify whether the fields of data type not supported by Debezium should be processed:'false' (the default) omits the fields; 'true' converts the field into an implementation dependent binary representation. | false | boolean
+| *incrementalSnapshotChunkSize* (postgres) | The maximum size of chunk for incremental snapshotting | 1024 | int
 | *intervalHandlingMode* (postgres) | Specify how INTERVAL columns should be represented in change events, including:'string' represents values as an exact ISO formatted string'numeric' (default) represents values using the inexact conversion into microseconds | numeric | String
 | *maxBatchSize* (postgres) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
 | *maxQueueSize* (postgres) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
@@ -246,8 +248,8 @@ with the following path and query parameters:
 | *snapshotIncludeCollectionList* (postgres) | this setting must be set to specify a list of tables/collections whose snapshot must be taken on creating or restarting the connector. |  | String
 | *snapshotLockTimeoutMs* (postgres) | The maximum number of millis to wait for table locks at the beginning of a snapshot. If locks cannot be acquired in this time frame, the snapshot will be aborted. Defaults to 10 seconds | 10s | long
 | *snapshotMaxThreads* (postgres) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
-| *snapshotMode* (postgres) | The criteria for running a snapshot upon startup of the connector. Options include: 'always' to specify that the connector run a snapshot each time it starts up; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes;'never' to specify t [...]
-| *snapshotSelectStatement{zwsp}Overrides* (postgres) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The  [...]
+| *snapshotMode* (postgres) | The criteria for running a snapshot upon startup of the connector. Options include: 'always' to specify that the connector run a snapshot each time it starts up; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes;'never' to specify t [...]
+| *snapshotSelectStatement{zwsp}Overrides* (postgres) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The v [...]
 | *sourceStructVersion* (postgres) | A version of the format of the publicly visible source part in the message | v2 | String
 | *statusUpdateIntervalMs* (postgres) | Frequency for sending replication connection status updates to the server, given in milliseconds. Defaults to 10 seconds (10,000 ms). | 10s | int
 | *tableBlacklist* (postgres) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/debezium-sqlserver-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/debezium-sqlserver-component.adoc
index 51b14c8..173c349 100644
--- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/debezium-sqlserver-component.adoc
+++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/debezium-sqlserver-component.adoc
@@ -46,7 +46,7 @@ debezium-sqlserver:name[?options]
 
 
 // component options: START
-The Debezium SQL Server Connector component supports 69 options, which are listed below.
+The Debezium SQL Server Connector component supports 73 options, which are listed below.
 
 
 
@@ -81,6 +81,9 @@ The Debezium SQL Server Connector component supports 69 options, which are liste
 | *databaseHistoryKafkaRecovery{zwsp}Attempts* (sqlserver) | The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms). | 100 | int
 | *databaseHistoryKafkaRecovery{zwsp}PollIntervalMs* (sqlserver) | The number of milliseconds to wait while polling for persisted data during recovery. | 100ms | int
 | *databaseHistoryKafkaTopic* (sqlserver) | The name of the topic for the database schema history |  | String
+| *databaseHistorySkipUnparseable{zwsp}Ddl* (sqlserver) | Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}CapturedTablesDdl* (sqlserver) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a captured table will be stored. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}MonitoredTablesDdl* (sqlserver) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a monitored table will be stored (deprecated, use database.history.store.only.captured.tables.ddl instead) | false | boolean
 | *databaseHostname* (sqlserver) | Resolvable hostname or IP address of the database server. |  | String
 | *databaseInstance* (sqlserver) | The SQL Server instance name |  | String
 | *databasePassword* (sqlserver) | *Required* Password of the database user to be used when connecting to the database. |  | String
@@ -95,6 +98,7 @@ The Debezium SQL Server Connector component supports 69 options, which are liste
 | *heartbeatTopicsPrefix* (sqlserver) | The prefix that is used to name heartbeat topics.Defaults to __debezium-heartbeat. | __debezium-heartbeat | String
 | *includeSchemaChanges* (sqlserver) | Whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change will be recorded using a key that contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s).The default is 'true'. This is independent of how the connector internally records database history. | true | boolean
 | *maxBatchSize* (sqlserver) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
+| *maxIterationTransactions* (sqlserver) | This property can be used to reduce the connector memory usage footprint when changes are streamed from multiple tables per database. | 0 | int
 | *maxQueueSize* (sqlserver) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
 | *maxQueueSizeInBytes* (sqlserver) | Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to 0. Mean the feature is not enabled | 0 | long
 | *messageKeyColumns* (sqlserver) | A semicolon-separated list of expressions that match fully-qualified tables and column(s) to be used as message key. Each expression must match the pattern ':',where the table names could be defined as (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on the specific connector,and the key columns are a comma-separated list of columns representing the custom key. For any table without an explicit key configuration the table's primary key colum [...]
@@ -112,9 +116,9 @@ The Debezium SQL Server Connector component supports 69 options, which are liste
 | *snapshotLockTimeoutMs* (sqlserver) | The maximum number of millis to wait for table locks at the beginning of a snapshot. If locks cannot be acquired in this time frame, the snapshot will be aborted. Defaults to 10 seconds | 10s | long
 | *snapshotMaxThreads* (sqlserver) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
 | *snapshotMode* (sqlserver) | The criteria for running a snapshot upon startup of the connector. Options include: 'initial' (the default) to specify the connector should run a snapshot only when no offsets are available for the logical server name; 'schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. | initial | String
-| *snapshotSelectStatement{zwsp}Overrides* (sqlserver) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The [...]
+| *snapshotSelectStatement{zwsp}Overrides* (sqlserver) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The  [...]
 | *sourceStructVersion* (sqlserver) | A version of the format of the publicly visible source part in the message | v2 | String
-| *sourceTimestampMode* (sqlserver) | Configures the criteria of the attached timestamp within the source record (ts_ms).Options include:'commit', (default) the source timestamp is set to the instant where the record was committed in the database'processing', the source timestamp is set to the instant where the record was processed by Debezium. | commit | String
+| *sourceTimestampMode* (sqlserver) | Configures the criteria of the attached timestamp within the source record (ts_ms).Options include:'commit', (default) the source timestamp is set to the instant where the record was committed in the database'processing', (deprecated) the source timestamp is set to the instant where the record was processed by Debezium. | commit | String
 | *tableBlacklist* (sqlserver) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
 | *tableExcludeList* (sqlserver) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring |  | String
 | *tableIgnoreBuiltin* (sqlserver) | Flag specifying whether built-in tables should be ignored. | true | boolean
@@ -145,7 +149,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (69 parameters):
+=== Query Parameters (73 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -179,6 +183,9 @@ with the following path and query parameters:
 | *databaseHistoryKafkaRecovery{zwsp}Attempts* (sqlserver) | The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms). | 100 | int
 | *databaseHistoryKafkaRecovery{zwsp}PollIntervalMs* (sqlserver) | The number of milliseconds to wait while polling for persisted data during recovery. | 100ms | int
 | *databaseHistoryKafkaTopic* (sqlserver) | The name of the topic for the database schema history |  | String
+| *databaseHistorySkipUnparseable{zwsp}Ddl* (sqlserver) | Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}CapturedTablesDdl* (sqlserver) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a captured table will be stored. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}MonitoredTablesDdl* (sqlserver) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a monitored table will be stored (deprecated, use database.history.store.only.captured.tables.ddl instead) | false | boolean
 | *databaseHostname* (sqlserver) | Resolvable hostname or IP address of the database server. |  | String
 | *databaseInstance* (sqlserver) | The SQL Server instance name |  | String
 | *databasePassword* (sqlserver) | *Required* Password of the database user to be used when connecting to the database. |  | String
@@ -193,6 +200,7 @@ with the following path and query parameters:
 | *heartbeatTopicsPrefix* (sqlserver) | The prefix that is used to name heartbeat topics.Defaults to __debezium-heartbeat. | __debezium-heartbeat | String
 | *includeSchemaChanges* (sqlserver) | Whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change will be recorded using a key that contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s).The default is 'true'. This is independent of how the connector internally records database history. | true | boolean
 | *maxBatchSize* (sqlserver) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
+| *maxIterationTransactions* (sqlserver) | This property can be used to reduce the connector memory usage footprint when changes are streamed from multiple tables per database. | 0 | int
 | *maxQueueSize* (sqlserver) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
 | *maxQueueSizeInBytes* (sqlserver) | Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to 0. Mean the feature is not enabled | 0 | long
 | *messageKeyColumns* (sqlserver) | A semicolon-separated list of expressions that match fully-qualified tables and column(s) to be used as message key. Each expression must match the pattern ':',where the table names could be defined as (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on the specific connector,and the key columns are a comma-separated list of columns representing the custom key. For any table without an explicit key configuration the table's primary key colum [...]
@@ -210,9 +218,9 @@ with the following path and query parameters:
 | *snapshotLockTimeoutMs* (sqlserver) | The maximum number of millis to wait for table locks at the beginning of a snapshot. If locks cannot be acquired in this time frame, the snapshot will be aborted. Defaults to 10 seconds | 10s | long
 | *snapshotMaxThreads* (sqlserver) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
 | *snapshotMode* (sqlserver) | The criteria for running a snapshot upon startup of the connector. Options include: 'initial' (the default) to specify the connector should run a snapshot only when no offsets are available for the logical server name; 'schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. | initial | String
-| *snapshotSelectStatement{zwsp}Overrides* (sqlserver) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The [...]
+| *snapshotSelectStatement{zwsp}Overrides* (sqlserver) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The  [...]
 | *sourceStructVersion* (sqlserver) | A version of the format of the publicly visible source part in the message | v2 | String
-| *sourceTimestampMode* (sqlserver) | Configures the criteria of the attached timestamp within the source record (ts_ms).Options include:'commit', (default) the source timestamp is set to the instant where the record was committed in the database'processing', the source timestamp is set to the instant where the record was processed by Debezium. | commit | String
+| *sourceTimestampMode* (sqlserver) | Configures the criteria of the attached timestamp within the source record (ts_ms).Options include:'commit', (default) the source timestamp is set to the instant where the record was committed in the database'processing', (deprecated) the source timestamp is set to the instant where the record was processed by Debezium. | commit | String
 | *tableBlacklist* (sqlserver) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
 | *tableExcludeList* (sqlserver) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring |  | String
 | *tableIgnoreBuiltin* (sqlserver) | Flag specifying whether built-in tables should be ignored. | true | boolean
diff --git a/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/DebeziumMySqlComponentConfigurer.java b/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/DebeziumMySqlComponentConfigurer.java
index 8575cdf..88322e1 100644
--- a/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/DebeziumMySqlComponentConfigurer.java
+++ b/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/DebeziumMySqlComponentConfigurer.java
@@ -1,14 +1,9 @@
 /* Generated by camel build tools - do NOT edit this file! */
 package org.apache.camel.component.debezium;
 
-import java.util.Map;
-
 import org.apache.camel.CamelContext;
-import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
-import org.apache.camel.spi.PropertyConfigurerGetter;
-import org.apache.camel.spi.ConfigurerStrategy;
 import org.apache.camel.spi.GeneratedPropertyConfigurer;
-import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.spi.PropertyConfigurerGetter;
 import org.apache.camel.support.component.PropertyConfigurerSupport;
 
 /**
@@ -70,6 +65,12 @@ public class DebeziumMySqlComponentConfigurer extends PropertyConfigurerSupport
         case "databaseHistoryKafkaRecoveryPollIntervalMs": getOrCreateConfiguration(target).setDatabaseHistoryKafkaRecoveryPollIntervalMs(property(camelContext, int.class, value)); return true;
         case "databasehistorykafkatopic":
         case "databaseHistoryKafkaTopic": getOrCreateConfiguration(target).setDatabaseHistoryKafkaTopic(property(camelContext, java.lang.String.class, value)); return true;
+        case "databasehistoryskipunparseableddl":
+        case "databaseHistorySkipUnparseableDdl": getOrCreateConfiguration(target).setDatabaseHistorySkipUnparseableDdl(property(camelContext, boolean.class, value)); return true;
+        case "databasehistorystoreonlycapturedtablesddl":
+        case "databaseHistoryStoreOnlyCapturedTablesDdl": getOrCreateConfiguration(target).setDatabaseHistoryStoreOnlyCapturedTablesDdl(property(camelContext, boolean.class, value)); return true;
+        case "databasehistorystoreonlymonitoredtablesddl":
+        case "databaseHistoryStoreOnlyMonitoredTablesDdl": getOrCreateConfiguration(target).setDatabaseHistoryStoreOnlyMonitoredTablesDdl(property(camelContext, boolean.class, value)); return true;
         case "databasehostname":
         case "databaseHostname": getOrCreateConfiguration(target).setDatabaseHostname(property(camelContext, java.lang.String.class, value)); return true;
         case "databaseincludelist":
@@ -126,6 +127,8 @@ public class DebeziumMySqlComponentConfigurer extends PropertyConfigurerSupport
         case "includeSchemaChanges": getOrCreateConfiguration(target).setIncludeSchemaChanges(property(camelContext, boolean.class, value)); return true;
         case "inconsistentschemahandlingmode":
         case "inconsistentSchemaHandlingMode": getOrCreateConfiguration(target).setInconsistentSchemaHandlingMode(property(camelContext, java.lang.String.class, value)); return true;
+        case "incrementalsnapshotchunksize":
+        case "incrementalSnapshotChunkSize": getOrCreateConfiguration(target).setIncrementalSnapshotChunkSize(property(camelContext, int.class, value)); return true;
         case "internalkeyconverter":
         case "internalKeyConverter": getOrCreateConfiguration(target).setInternalKeyConverter(property(camelContext, java.lang.String.class, value)); return true;
         case "internalvalueconverter":
@@ -251,6 +254,12 @@ public class DebeziumMySqlComponentConfigurer extends PropertyConfigurerSupport
         case "databaseHistoryKafkaRecoveryPollIntervalMs": return int.class;
         case "databasehistorykafkatopic":
         case "databaseHistoryKafkaTopic": return java.lang.String.class;
+        case "databasehistoryskipunparseableddl":
+        case "databaseHistorySkipUnparseableDdl": return boolean.class;
+        case "databasehistorystoreonlycapturedtablesddl":
+        case "databaseHistoryStoreOnlyCapturedTablesDdl": return boolean.class;
+        case "databasehistorystoreonlymonitoredtablesddl":
+        case "databaseHistoryStoreOnlyMonitoredTablesDdl": return boolean.class;
         case "databasehostname":
         case "databaseHostname": return java.lang.String.class;
         case "databaseincludelist":
@@ -307,6 +316,8 @@ public class DebeziumMySqlComponentConfigurer extends PropertyConfigurerSupport
         case "includeSchemaChanges": return boolean.class;
         case "inconsistentschemahandlingmode":
         case "inconsistentSchemaHandlingMode": return java.lang.String.class;
+        case "incrementalsnapshotchunksize":
+        case "incrementalSnapshotChunkSize": return int.class;
         case "internalkeyconverter":
         case "internalKeyConverter": return java.lang.String.class;
         case "internalvalueconverter":
@@ -433,6 +444,12 @@ public class DebeziumMySqlComponentConfigurer extends PropertyConfigurerSupport
         case "databaseHistoryKafkaRecoveryPollIntervalMs": return getOrCreateConfiguration(target).getDatabaseHistoryKafkaRecoveryPollIntervalMs();
         case "databasehistorykafkatopic":
         case "databaseHistoryKafkaTopic": return getOrCreateConfiguration(target).getDatabaseHistoryKafkaTopic();
+        case "databasehistoryskipunparseableddl":
+        case "databaseHistorySkipUnparseableDdl": return getOrCreateConfiguration(target).isDatabaseHistorySkipUnparseableDdl();
+        case "databasehistorystoreonlycapturedtablesddl":
+        case "databaseHistoryStoreOnlyCapturedTablesDdl": return getOrCreateConfiguration(target).isDatabaseHistoryStoreOnlyCapturedTablesDdl();
+        case "databasehistorystoreonlymonitoredtablesddl":
+        case "databaseHistoryStoreOnlyMonitoredTablesDdl": return getOrCreateConfiguration(target).isDatabaseHistoryStoreOnlyMonitoredTablesDdl();
         case "databasehostname":
         case "databaseHostname": return getOrCreateConfiguration(target).getDatabaseHostname();
         case "databaseincludelist":
@@ -489,6 +506,8 @@ public class DebeziumMySqlComponentConfigurer extends PropertyConfigurerSupport
         case "includeSchemaChanges": return getOrCreateConfiguration(target).isIncludeSchemaChanges();
         case "inconsistentschemahandlingmode":
         case "inconsistentSchemaHandlingMode": return getOrCreateConfiguration(target).getInconsistentSchemaHandlingMode();
+        case "incrementalsnapshotchunksize":
+        case "incrementalSnapshotChunkSize": return getOrCreateConfiguration(target).getIncrementalSnapshotChunkSize();
         case "internalkeyconverter":
         case "internalKeyConverter": return getOrCreateConfiguration(target).getInternalKeyConverter();
         case "internalvalueconverter":
diff --git a/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/DebeziumMySqlEndpointConfigurer.java b/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/DebeziumMySqlEndpointConfigurer.java
index fbbeb26..448757c 100644
--- a/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/DebeziumMySqlEndpointConfigurer.java
+++ b/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/DebeziumMySqlEndpointConfigurer.java
@@ -1,14 +1,9 @@
 /* Generated by camel build tools - do NOT edit this file! */
 package org.apache.camel.component.debezium;
 
-import java.util.Map;
-
 import org.apache.camel.CamelContext;
-import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
-import org.apache.camel.spi.PropertyConfigurerGetter;
-import org.apache.camel.spi.ConfigurerStrategy;
 import org.apache.camel.spi.GeneratedPropertyConfigurer;
-import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.spi.PropertyConfigurerGetter;
 import org.apache.camel.support.component.PropertyConfigurerSupport;
 
 /**
@@ -60,6 +55,12 @@ public class DebeziumMySqlEndpointConfigurer extends PropertyConfigurerSupport i
         case "databaseHistoryKafkaRecoveryPollIntervalMs": target.getConfiguration().setDatabaseHistoryKafkaRecoveryPollIntervalMs(property(camelContext, int.class, value)); return true;
         case "databasehistorykafkatopic":
         case "databaseHistoryKafkaTopic": target.getConfiguration().setDatabaseHistoryKafkaTopic(property(camelContext, java.lang.String.class, value)); return true;
+        case "databasehistoryskipunparseableddl":
+        case "databaseHistorySkipUnparseableDdl": target.getConfiguration().setDatabaseHistorySkipUnparseableDdl(property(camelContext, boolean.class, value)); return true;
+        case "databasehistorystoreonlycapturedtablesddl":
+        case "databaseHistoryStoreOnlyCapturedTablesDdl": target.getConfiguration().setDatabaseHistoryStoreOnlyCapturedTablesDdl(property(camelContext, boolean.class, value)); return true;
+        case "databasehistorystoreonlymonitoredtablesddl":
+        case "databaseHistoryStoreOnlyMonitoredTablesDdl": target.getConfiguration().setDatabaseHistoryStoreOnlyMonitoredTablesDdl(property(camelContext, boolean.class, value)); return true;
         case "databasehostname":
         case "databaseHostname": target.getConfiguration().setDatabaseHostname(property(camelContext, java.lang.String.class, value)); return true;
         case "databaseincludelist":
@@ -120,6 +121,8 @@ public class DebeziumMySqlEndpointConfigurer extends PropertyConfigurerSupport i
         case "includeSchemaChanges": target.getConfiguration().setIncludeSchemaChanges(property(camelContext, boolean.class, value)); return true;
         case "inconsistentschemahandlingmode":
         case "inconsistentSchemaHandlingMode": target.getConfiguration().setInconsistentSchemaHandlingMode(property(camelContext, java.lang.String.class, value)); return true;
+        case "incrementalsnapshotchunksize":
+        case "incrementalSnapshotChunkSize": target.getConfiguration().setIncrementalSnapshotChunkSize(property(camelContext, int.class, value)); return true;
         case "internalkeyconverter":
         case "internalKeyConverter": target.getConfiguration().setInternalKeyConverter(property(camelContext, java.lang.String.class, value)); return true;
         case "internalvalueconverter":
@@ -242,6 +245,12 @@ public class DebeziumMySqlEndpointConfigurer extends PropertyConfigurerSupport i
         case "databaseHistoryKafkaRecoveryPollIntervalMs": return int.class;
         case "databasehistorykafkatopic":
         case "databaseHistoryKafkaTopic": return java.lang.String.class;
+        case "databasehistoryskipunparseableddl":
+        case "databaseHistorySkipUnparseableDdl": return boolean.class;
+        case "databasehistorystoreonlycapturedtablesddl":
+        case "databaseHistoryStoreOnlyCapturedTablesDdl": return boolean.class;
+        case "databasehistorystoreonlymonitoredtablesddl":
+        case "databaseHistoryStoreOnlyMonitoredTablesDdl": return boolean.class;
         case "databasehostname":
         case "databaseHostname": return java.lang.String.class;
         case "databaseincludelist":
@@ -302,6 +311,8 @@ public class DebeziumMySqlEndpointConfigurer extends PropertyConfigurerSupport i
         case "includeSchemaChanges": return boolean.class;
         case "inconsistentschemahandlingmode":
         case "inconsistentSchemaHandlingMode": return java.lang.String.class;
+        case "incrementalsnapshotchunksize":
+        case "incrementalSnapshotChunkSize": return int.class;
         case "internalkeyconverter":
         case "internalKeyConverter": return java.lang.String.class;
         case "internalvalueconverter":
@@ -425,6 +436,12 @@ public class DebeziumMySqlEndpointConfigurer extends PropertyConfigurerSupport i
         case "databaseHistoryKafkaRecoveryPollIntervalMs": return target.getConfiguration().getDatabaseHistoryKafkaRecoveryPollIntervalMs();
         case "databasehistorykafkatopic":
         case "databaseHistoryKafkaTopic": return target.getConfiguration().getDatabaseHistoryKafkaTopic();
+        case "databasehistoryskipunparseableddl":
+        case "databaseHistorySkipUnparseableDdl": return target.getConfiguration().isDatabaseHistorySkipUnparseableDdl();
+        case "databasehistorystoreonlycapturedtablesddl":
+        case "databaseHistoryStoreOnlyCapturedTablesDdl": return target.getConfiguration().isDatabaseHistoryStoreOnlyCapturedTablesDdl();
+        case "databasehistorystoreonlymonitoredtablesddl":
+        case "databaseHistoryStoreOnlyMonitoredTablesDdl": return target.getConfiguration().isDatabaseHistoryStoreOnlyMonitoredTablesDdl();
         case "databasehostname":
         case "databaseHostname": return target.getConfiguration().getDatabaseHostname();
         case "databaseincludelist":
@@ -485,6 +502,8 @@ public class DebeziumMySqlEndpointConfigurer extends PropertyConfigurerSupport i
         case "includeSchemaChanges": return target.getConfiguration().isIncludeSchemaChanges();
         case "inconsistentschemahandlingmode":
         case "inconsistentSchemaHandlingMode": return target.getConfiguration().getInconsistentSchemaHandlingMode();
+        case "incrementalsnapshotchunksize":
+        case "incrementalSnapshotChunkSize": return target.getConfiguration().getIncrementalSnapshotChunkSize();
         case "internalkeyconverter":
         case "internalKeyConverter": return target.getConfiguration().getInternalKeyConverter();
         case "internalvalueconverter":
diff --git a/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/DebeziumMySqlEndpointUriFactory.java b/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/DebeziumMySqlEndpointUriFactory.java
index 8feeedf..78cfe10 100644
--- a/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/DebeziumMySqlEndpointUriFactory.java
+++ b/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/DebeziumMySqlEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class DebeziumMySqlEndpointUriFactory extends org.apache.camel.support.co
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(89);
+        Set<String> props = new HashSet<>(93);
         props.add("databaseJdbcDriver");
         props.add("maxBatchSize");
         props.add("internalKeyConverter");
@@ -44,6 +44,7 @@ public class DebeziumMySqlEndpointUriFactory extends org.apache.camel.support.co
         props.add("sourceStructVersion");
         props.add("bigintUnsignedHandlingMode");
         props.add("databaseHostname");
+        props.add("databaseHistorySkipUnparseableDdl");
         props.add("databaseSslKeystorePassword");
         props.add("eventProcessingFailureHandlingMode");
         props.add("offsetCommitTimeoutMs");
@@ -70,6 +71,7 @@ public class DebeziumMySqlEndpointUriFactory extends org.apache.camel.support.co
         props.add("databaseHistory");
         props.add("includeQuery");
         props.add("columnPropagateSourceType");
+        props.add("databaseHistoryStoreOnlyCapturedTablesDdl");
         props.add("offsetStorage");
         props.add("includeSchemaChanges");
         props.add("internalValueConverter");
@@ -84,6 +86,7 @@ public class DebeziumMySqlEndpointUriFactory extends org.apache.camel.support.co
         props.add("sanitizeFieldNames");
         props.add("databaseHistoryKafkaTopic");
         props.add("tableWhitelist");
+        props.add("databaseHistoryStoreOnlyMonitoredTablesDdl");
         props.add("tableIgnoreBuiltin");
         props.add("signalDataCollection");
         props.add("exchangePattern");
@@ -102,6 +105,7 @@ public class DebeziumMySqlEndpointUriFactory extends org.apache.camel.support.co
         props.add("tableExcludeList");
         props.add("offsetCommitPolicy");
         props.add("tableIncludeList");
+        props.add("incrementalSnapshotChunkSize");
         props.add("columnExcludeList");
         props.add("gtidSourceIncludes");
         props.add("columnIncludeList");
diff --git a/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/configuration/MySqlConnectorEmbeddedDebeziumConfiguration.java b/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/configuration/MySqlConnectorEmbeddedDebeziumConfiguration.java
index 029b126..d20b561 100644
--- a/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/configuration/MySqlConnectorEmbeddedDebeziumConfiguration.java
+++ b/components/camel-debezium/camel-debezium-mysql/src/generated/java/org/apache/camel/component/debezium/configuration/MySqlConnectorEmbeddedDebeziumConfiguration.java
@@ -36,6 +36,8 @@ public class MySqlConnectorEmbeddedDebeziumConfiguration
     private String signalDataCollection;
     @UriParam(label = LABEL_NAME)
     private String databaseInitialStatements;
+    @UriParam(label = LABEL_NAME, defaultValue = "false")
+    private boolean databaseHistoryStoreOnlyCapturedTablesDdl = false;
     @UriParam(label = LABEL_NAME)
     private String converters;
     @UriParam(label = LABEL_NAME, defaultValue = "__debezium-heartbeat")
@@ -83,6 +85,8 @@ public class MySqlConnectorEmbeddedDebeziumConfiguration
     private String databasePassword;
     @UriParam(label = LABEL_NAME)
     private String databaseExcludeList;
+    @UriParam(label = LABEL_NAME, defaultValue = "false")
+    private boolean databaseHistoryStoreOnlyMonitoredTablesDdl = false;
     @UriParam(label = LABEL_NAME, defaultValue = "true")
     private boolean gtidSourceFilterDmlEvents = true;
     @UriParam(label = LABEL_NAME, defaultValue = "2048")
@@ -99,6 +103,8 @@ public class MySqlConnectorEmbeddedDebeziumConfiguration
     private int connectTimeoutMs = 30000;
     @UriParam(label = LABEL_NAME, defaultValue = "8192")
     private int maxQueueSize = 8192;
+    @UriParam(label = LABEL_NAME, defaultValue = "1024")
+    private int incrementalSnapshotChunkSize = 1024;
     @UriParam(label = LABEL_NAME)
     private String databaseHistoryKafkaTopic;
     @UriParam(label = LABEL_NAME, defaultValue = "10s", javaType = "java.time.Duration")
@@ -117,6 +123,8 @@ public class MySqlConnectorEmbeddedDebeziumConfiguration
     private String decimalHandlingMode = "precise";
     @UriParam(label = LABEL_NAME, defaultValue = "off")
     private String snapshotNewTables = "off";
+    @UriParam(label = LABEL_NAME, defaultValue = "false")
+    private boolean databaseHistorySkipUnparseableDdl = false;
     @UriParam(label = LABEL_NAME, defaultValue = "true")
     private boolean tableIgnoreBuiltin = true;
     @UriParam(label = LABEL_NAME)
@@ -338,6 +346,20 @@ public class MySqlConnectorEmbeddedDebeziumConfiguration
     }
 
     /**
+     * Controls what DDL will Debezium store in database history. By default
+     * (false) Debezium will store all incoming DDL statements. If set to true,
+     * then only DDL that manipulates a captured table will be stored.
+     */
+    public void setDatabaseHistoryStoreOnlyCapturedTablesDdl(
+            boolean databaseHistoryStoreOnlyCapturedTablesDdl) {
+        this.databaseHistoryStoreOnlyCapturedTablesDdl = databaseHistoryStoreOnlyCapturedTablesDdl;
+    }
+
+    public boolean isDatabaseHistoryStoreOnlyCapturedTablesDdl() {
+        return databaseHistoryStoreOnlyCapturedTablesDdl;
+    }
+
+    /**
      * Optional list of custom converters that would be used instead of default
      * ones. The converters are defined using '<converter.prefix>.type' config
      * option and configured using options '<converter.prefix>.<option>'
@@ -453,7 +475,7 @@ public class MySqlConnectorEmbeddedDebeziumConfiguration
     /**
      *  This property contains a comma-separated list of fully-qualified tables
      * (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on
-     * thespecific connectors . Select statements for the individual tables are
+     * thespecific connectors. Select statements for the individual tables are
      * specified in further configuration properties, one for each table,
      * identified by the id
      * 'snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]' or
@@ -644,6 +666,22 @@ public class MySqlConnectorEmbeddedDebeziumConfiguration
     }
 
     /**
+     * Controls what DDL will Debezium store in database history. By default
+     * (false) Debezium will store all incoming DDL statements. If set to true,
+     * then only DDL that manipulates a monitored table will be stored
+     * (deprecated, use "database.history.store.only.captured.tables.ddl"
+     * instead)
+     */
+    public void setDatabaseHistoryStoreOnlyMonitoredTablesDdl(
+            boolean databaseHistoryStoreOnlyMonitoredTablesDdl) {
+        this.databaseHistoryStoreOnlyMonitoredTablesDdl = databaseHistoryStoreOnlyMonitoredTablesDdl;
+    }
+
+    public boolean isDatabaseHistoryStoreOnlyMonitoredTablesDdl() {
+        return databaseHistoryStoreOnlyMonitoredTablesDdl;
+    }
+
+    /**
      * If set to true, we will only produce DML events into Kafka for
      * transactions that were written on mysql servers with UUIDs matching the
      * filters defined by the gtid.source.includes or gtid.source.excludes
@@ -754,6 +792,17 @@ public class MySqlConnectorEmbeddedDebeziumConfiguration
     }
 
     /**
+     * The maximum size of chunk for incremental snapshotting
+     */
+    public void setIncrementalSnapshotChunkSize(int incrementalSnapshotChunkSize) {
+        this.incrementalSnapshotChunkSize = incrementalSnapshotChunkSize;
+    }
+
+    public int getIncrementalSnapshotChunkSize() {
+        return incrementalSnapshotChunkSize;
+    }
+
+    /**
      * The name of the topic for the database schema history
      */
     public void setDatabaseHistoryKafkaTopic(String databaseHistoryKafkaTopic) {
@@ -880,6 +929,21 @@ public class MySqlConnectorEmbeddedDebeziumConfiguration
     }
 
     /**
+     * Controls the action Debezium will take when it meets a DDL statement in
+     * binlog, that it cannot parse.By default the connector will stop operating
+     * but by changing the setting it can ignore the statements which it cannot
+     * parse. If skipping is enabled then Debezium can miss metadata changes.
+     */
+    public void setDatabaseHistorySkipUnparseableDdl(
+            boolean databaseHistorySkipUnparseableDdl) {
+        this.databaseHistorySkipUnparseableDdl = databaseHistorySkipUnparseableDdl;
+    }
+
+    public boolean isDatabaseHistorySkipUnparseableDdl() {
+        return databaseHistorySkipUnparseableDdl;
+    }
+
+    /**
      * Flag specifying whether built-in tables should be ignored.
      */
     public void setTableIgnoreBuiltin(boolean tableIgnoreBuiltin) {
@@ -1200,6 +1264,7 @@ public class MySqlConnectorEmbeddedDebeziumConfiguration
         addPropertyIfNotNull(configBuilder, "database.history.kafka.recovery.poll.interval.ms", databaseHistoryKafkaRecoveryPollIntervalMs);
         addPropertyIfNotNull(configBuilder, "signal.data.collection", signalDataCollection);
         addPropertyIfNotNull(configBuilder, "database.initial.statements", databaseInitialStatements);
+        addPropertyIfNotNull(configBuilder, "database.history.store.only.captured.tables.ddl", databaseHistoryStoreOnlyCapturedTablesDdl);
         addPropertyIfNotNull(configBuilder, "converters", converters);
         addPropertyIfNotNull(configBuilder, "heartbeat.topics.prefix", heartbeatTopicsPrefix);
         addPropertyIfNotNull(configBuilder, "binlog.buffer.size", binlogBufferSize);
@@ -1223,6 +1288,7 @@ public class MySqlConnectorEmbeddedDebeziumConfiguration
         addPropertyIfNotNull(configBuilder, "table.exclude.list", tableExcludeList);
         addPropertyIfNotNull(configBuilder, "database.password", databasePassword);
         addPropertyIfNotNull(configBuilder, "database.exclude.list", databaseExcludeList);
+        addPropertyIfNotNull(configBuilder, "database.history.store.only.monitored.tables.ddl", databaseHistoryStoreOnlyMonitoredTablesDdl);
         addPropertyIfNotNull(configBuilder, "gtid.source.filter.dml.events", gtidSourceFilterDmlEvents);
         addPropertyIfNotNull(configBuilder, "max.batch.size", maxBatchSize);
         addPropertyIfNotNull(configBuilder, "skipped.operations", skippedOperations);
@@ -1231,6 +1297,7 @@ public class MySqlConnectorEmbeddedDebeziumConfiguration
         addPropertyIfNotNull(configBuilder, "database.history", databaseHistory);
         addPropertyIfNotNull(configBuilder, "connect.timeout.ms", connectTimeoutMs);
         addPropertyIfNotNull(configBuilder, "max.queue.size", maxQueueSize);
+        addPropertyIfNotNull(configBuilder, "incremental.snapshot.chunk.size", incrementalSnapshotChunkSize);
         addPropertyIfNotNull(configBuilder, "database.history.kafka.topic", databaseHistoryKafkaTopic);
         addPropertyIfNotNull(configBuilder, "retriable.restart.connector.wait.ms", retriableRestartConnectorWaitMs);
         addPropertyIfNotNull(configBuilder, "snapshot.delay.ms", snapshotDelayMs);
@@ -1240,6 +1307,7 @@ public class MySqlConnectorEmbeddedDebeziumConfiguration
         addPropertyIfNotNull(configBuilder, "tombstones.on.delete", tombstonesOnDelete);
         addPropertyIfNotNull(configBuilder, "decimal.handling.mode", decimalHandlingMode);
         addPropertyIfNotNull(configBuilder, "snapshot.new.tables", snapshotNewTables);
+        addPropertyIfNotNull(configBuilder, "database.history.skip.unparseable.ddl", databaseHistorySkipUnparseableDdl);
         addPropertyIfNotNull(configBuilder, "table.ignore.builtin", tableIgnoreBuiltin);
         addPropertyIfNotNull(configBuilder, "snapshot.include.collection.list", snapshotIncludeCollectionList);
         addPropertyIfNotNull(configBuilder, "database.history.file.filename", databaseHistoryFileFilename);
diff --git a/components/camel-debezium/camel-debezium-mysql/src/generated/resources/org/apache/camel/component/debezium/debezium-mysql.json b/components/camel-debezium/camel-debezium-mysql/src/generated/resources/org/apache/camel/component/debezium/debezium-mysql.json
index 5cf4dcf..37e875c 100644
--- a/components/camel-debezium/camel-debezium-mysql/src/generated/resources/org/apache/camel/component/debezium/debezium-mysql.json
+++ b/components/camel-debezium/camel-debezium-mysql/src/generated/resources/org/apache/camel/component/debezium/debezium-mysql.json
@@ -54,6 +54,9 @@
     "databaseHistoryKafkaRecoveryAttempts": { "kind": "property", "displayName": "Database History Kafka Recovery Attempts", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 100, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The number o [...]
     "databaseHistoryKafkaRecoveryPollIntervalMs": { "kind": "property", "displayName": "Database History Kafka Recovery Poll Interval Ms", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "duration", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "100ms", "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "descript [...]
     "databaseHistoryKafkaTopic": { "kind": "property", "displayName": "Database History Kafka Topic", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The name of the topic for the database schem [...]
+    "databaseHistorySkipUnparseableDdl": { "kind": "property", "displayName": "Database History Skip Unparseable Ddl", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Controls the [...]
+    "databaseHistoryStoreOnlyCapturedTablesDdl": { "kind": "property", "displayName": "Database History Store Only Captured Tables Ddl", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "descripti [...]
+    "databaseHistoryStoreOnlyMonitoredTablesDdl": { "kind": "property", "displayName": "Database History Store Only Monitored Tables Ddl", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "descrip [...]
     "databaseHostname": { "kind": "property", "displayName": "Database Hostname", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Resolvable hostname or IP address of the database server." },
     "databaseIncludeList": { "kind": "property", "displayName": "Database Include List", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The databases for which changes are to be captured" },
     "databaseInitialStatements": { "kind": "property", "displayName": "Database Initial Statements", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A semicolon separated list of SQL statements  [...]
@@ -82,6 +85,7 @@
     "includeQuery": { "kind": "property", "displayName": "Include Query", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Whether the connector should include the original SQL que [...]
     "includeSchemaChanges": { "kind": "property", "displayName": "Include Schema Changes", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Whether the connector should publish chan [...]
     "inconsistentSchemaHandlingMode": { "kind": "property", "displayName": "Inconsistent Schema Handling Mode", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "fail", "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Specify ho [...]
+    "incrementalSnapshotChunkSize": { "kind": "property", "displayName": "Incremental Snapshot Chunk Size", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1024, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The maximum size of chunk fo [...]
     "maxBatchSize": { "kind": "property", "displayName": "Max Batch Size", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2048, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of each batch of source records. Defaults to 2048." },
     "maxQueueSize": { "kind": "property", "displayName": "Max Queue Size", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 8192, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of the queue for change events read from the dat [...]
     "maxQueueSizeInBytes": { "kind": "property", "displayName": "Max Queue Size In Bytes", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 0, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of the queue in bytes for change e [...]
@@ -101,7 +105,7 @@
     "snapshotMaxThreads": { "kind": "property", "displayName": "Snapshot Max Threads", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The maximum number of threads used to perform the sn [...]
     "snapshotMode": { "kind": "property", "displayName": "Snapshot Mode", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "initial", "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The criteria for running a snapshot upon star [...]
     "snapshotNewTables": { "kind": "property", "displayName": "Snapshot New Tables", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "off", "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "BETA FEATURE: On connector restart, th [...]
-    "snapshotSelectStatementOverrides": { "kind": "property", "displayName": "Snapshot Select Statement Overrides", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "This property contains a comma [...]
+    "snapshotSelectStatementOverrides": { "kind": "property", "displayName": "Snapshot Select Statement Overrides", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "This property contains a comma [...]
     "sourceStructVersion": { "kind": "property", "displayName": "Source Struct Version", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "v2", "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A version of the format of the publ [...]
     "tableBlacklist": { "kind": "property", "displayName": "Table Blacklist", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A comma-separated list of regular expressions that match the fully-q [...]
     "tableExcludeList": { "kind": "property", "displayName": "Table Exclude List", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A comma-separated list of regular expressions that match the fu [...]
@@ -145,6 +149,9 @@
     "databaseHistoryKafkaRecoveryAttempts": { "kind": "parameter", "displayName": "Database History Kafka Recovery Attempts", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 100, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The number  [...]
     "databaseHistoryKafkaRecoveryPollIntervalMs": { "kind": "parameter", "displayName": "Database History Kafka Recovery Poll Interval Ms", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "duration", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "100ms", "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "descrip [...]
     "databaseHistoryKafkaTopic": { "kind": "parameter", "displayName": "Database History Kafka Topic", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The name of the topic for the database sche [...]
+    "databaseHistorySkipUnparseableDdl": { "kind": "parameter", "displayName": "Database History Skip Unparseable Ddl", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Controls th [...]
+    "databaseHistoryStoreOnlyCapturedTablesDdl": { "kind": "parameter", "displayName": "Database History Store Only Captured Tables Ddl", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "descript [...]
+    "databaseHistoryStoreOnlyMonitoredTablesDdl": { "kind": "parameter", "displayName": "Database History Store Only Monitored Tables Ddl", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "descri [...]
     "databaseHostname": { "kind": "parameter", "displayName": "Database Hostname", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Resolvable hostname or IP address of the database server." },
     "databaseIncludeList": { "kind": "parameter", "displayName": "Database Include List", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The databases for which changes are to be captured" },
     "databaseInitialStatements": { "kind": "parameter", "displayName": "Database Initial Statements", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A semicolon separated list of SQL statements [...]
@@ -173,6 +180,7 @@
     "includeQuery": { "kind": "parameter", "displayName": "Include Query", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Whether the connector should include the original SQL qu [...]
     "includeSchemaChanges": { "kind": "parameter", "displayName": "Include Schema Changes", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Whether the connector should publish cha [...]
     "inconsistentSchemaHandlingMode": { "kind": "parameter", "displayName": "Inconsistent Schema Handling Mode", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "fail", "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Specify h [...]
+    "incrementalSnapshotChunkSize": { "kind": "parameter", "displayName": "Incremental Snapshot Chunk Size", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1024, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The maximum size of chunk f [...]
     "maxBatchSize": { "kind": "parameter", "displayName": "Max Batch Size", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2048, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of each batch of source records. Defaults to 2048." },
     "maxQueueSize": { "kind": "parameter", "displayName": "Max Queue Size", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 8192, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of the queue for change events read from the da [...]
     "maxQueueSizeInBytes": { "kind": "parameter", "displayName": "Max Queue Size In Bytes", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 0, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of the queue in bytes for change  [...]
@@ -192,7 +200,7 @@
     "snapshotMaxThreads": { "kind": "parameter", "displayName": "Snapshot Max Threads", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The maximum number of threads used to perform the s [...]
     "snapshotMode": { "kind": "parameter", "displayName": "Snapshot Mode", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "initial", "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The criteria for running a snapshot upon sta [...]
     "snapshotNewTables": { "kind": "parameter", "displayName": "Snapshot New Tables", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "off", "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "BETA FEATURE: On connector restart, t [...]
-    "snapshotSelectStatementOverrides": { "kind": "parameter", "displayName": "Snapshot Select Statement Overrides", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "This property contains a comm [...]
+    "snapshotSelectStatementOverrides": { "kind": "parameter", "displayName": "Snapshot Select Statement Overrides", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "This property contains a comm [...]
     "sourceStructVersion": { "kind": "parameter", "displayName": "Source Struct Version", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "v2", "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A version of the format of the pub [...]
     "tableBlacklist": { "kind": "parameter", "displayName": "Table Blacklist", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A comma-separated list of regular expressions that match the fully- [...]
     "tableExcludeList": { "kind": "parameter", "displayName": "Table Exclude List", "group": "mysql", "label": "consumer,mysql", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A comma-separated list of regular expressions that match the f [...]
diff --git a/components/camel-debezium/camel-debezium-mysql/src/main/docs/debezium-mysql-component.adoc b/components/camel-debezium/camel-debezium-mysql/src/main/docs/debezium-mysql-component.adoc
index 7cc5b62..7d8e9a8 100644
--- a/components/camel-debezium/camel-debezium-mysql/src/main/docs/debezium-mysql-component.adoc
+++ b/components/camel-debezium/camel-debezium-mysql/src/main/docs/debezium-mysql-component.adoc
@@ -56,7 +56,7 @@ debezium-mysql:name[?options]
 
 
 // component options: START
-The Debezium MySQL Connector component supports 88 options, which are listed below.
+The Debezium MySQL Connector component supports 92 options, which are listed below.
 
 
 
@@ -95,6 +95,9 @@ The Debezium MySQL Connector component supports 88 options, which are listed bel
 | *databaseHistoryKafkaRecovery{zwsp}Attempts* (mysql) | The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms). | 100 | int
 | *databaseHistoryKafkaRecovery{zwsp}PollIntervalMs* (mysql) | The number of milliseconds to wait while polling for persisted data during recovery. | 100ms | int
 | *databaseHistoryKafkaTopic* (mysql) | The name of the topic for the database schema history |  | String
+| *databaseHistorySkipUnparseable{zwsp}Ddl* (mysql) | Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}CapturedTablesDdl* (mysql) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a captured table will be stored. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}MonitoredTablesDdl* (mysql) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a monitored table will be stored (deprecated, use database.history.store.only.captured.tables.ddl instead) | false | boolean
 | *databaseHostname* (mysql) | Resolvable hostname or IP address of the database server. |  | String
 | *databaseIncludeList* (mysql) | The databases for which changes are to be captured |  | String
 | *databaseInitialStatements* (mysql) | A semicolon separated list of SQL statements to be executed when a JDBC connection (not binlog reading connection) to the database is established. Note that the connector may establish JDBC connections at its own discretion, so this should typically be used for configuration of session parameters only,but not for executing DML statements. Use doubled semicolon (';;') to use a semicolon as a character and not as a delimiter. |  | String
@@ -123,6 +126,7 @@ The Debezium MySQL Connector component supports 88 options, which are listed bel
 | *includeQuery* (mysql) | Whether the connector should include the original SQL query that generated the change event. Note: This option requires MySQL be configured with the binlog_rows_query_log_events option set to ON. Query will not be present for events generated from snapshot. WARNING: Enabling this option may expose tables or fields explicitly blacklisted or masked by including the original SQL statement in the change event. For this reason the default value is 'false'. | false | [...]
 | *includeSchemaChanges* (mysql) | Whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change will be recorded using a key that contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s).The default is 'true'. This is independent of how the connector internally records database history. | true | boolean
 | *inconsistentSchemaHandlingMode* (mysql) | Specify how binlog events that belong to a table missing from internal schema representation (i.e. internal representation is not consistent with database) should be handled, including:'fail' (the default) an exception indicating the problematic event and its binlog position is raised, causing the connector to be stopped; 'warn' the problematic event and its binlog position will be logged and the event will be skipped;'skip' the problematic ev [...]
+| *incrementalSnapshotChunkSize* (mysql) | The maximum size of chunk for incremental snapshotting | 1024 | int
 | *maxBatchSize* (mysql) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
 | *maxQueueSize* (mysql) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
 | *maxQueueSizeInBytes* (mysql) | Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to 0. Mean the feature is not enabled | 0 | long
@@ -142,7 +146,7 @@ The Debezium MySQL Connector component supports 88 options, which are listed bel
 | *snapshotMaxThreads* (mysql) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
 | *snapshotMode* (mysql) | The criteria for running a snapshot upon startup of the connector. Options include: 'when_needed' to specify that the connector run a snapshot upon startup whenever it deems it necessary; 'schema_only' to only take a snapshot of the schema (table structures) but no actual data; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector  [...]
 | *snapshotNewTables* (mysql) | BETA FEATURE: On connector restart, the connector will check if there have been any new tables added to the configuration, and snapshot them. There is presently only two options:'off': Default behavior. Do not snapshot new tables.'parallel': The snapshot of the new tables will occur in parallel to the continued binlog reading of the old tables. When the snapshot completes, an independent binlog reader will begin reading the events for the new tables until  [...]
-| *snapshotSelectStatement{zwsp}Overrides* (mysql) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The val [...]
+| *snapshotSelectStatement{zwsp}Overrides* (mysql) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The valu [...]
 | *sourceStructVersion* (mysql) | A version of the format of the publicly visible source part in the message | v2 | String
 | *tableBlacklist* (mysql) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
 | *tableExcludeList* (mysql) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring |  | String
@@ -174,7 +178,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (88 parameters):
+=== Query Parameters (92 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -212,6 +216,9 @@ with the following path and query parameters:
 | *databaseHistoryKafkaRecovery{zwsp}Attempts* (mysql) | The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms). | 100 | int
 | *databaseHistoryKafkaRecovery{zwsp}PollIntervalMs* (mysql) | The number of milliseconds to wait while polling for persisted data during recovery. | 100ms | int
 | *databaseHistoryKafkaTopic* (mysql) | The name of the topic for the database schema history |  | String
+| *databaseHistorySkipUnparseable{zwsp}Ddl* (mysql) | Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}CapturedTablesDdl* (mysql) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a captured table will be stored. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}MonitoredTablesDdl* (mysql) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a monitored table will be stored (deprecated, use database.history.store.only.captured.tables.ddl instead) | false | boolean
 | *databaseHostname* (mysql) | Resolvable hostname or IP address of the database server. |  | String
 | *databaseIncludeList* (mysql) | The databases for which changes are to be captured |  | String
 | *databaseInitialStatements* (mysql) | A semicolon separated list of SQL statements to be executed when a JDBC connection (not binlog reading connection) to the database is established. Note that the connector may establish JDBC connections at its own discretion, so this should typically be used for configuration of session parameters only,but not for executing DML statements. Use doubled semicolon (';;') to use a semicolon as a character and not as a delimiter. |  | String
@@ -240,6 +247,7 @@ with the following path and query parameters:
 | *includeQuery* (mysql) | Whether the connector should include the original SQL query that generated the change event. Note: This option requires MySQL be configured with the binlog_rows_query_log_events option set to ON. Query will not be present for events generated from snapshot. WARNING: Enabling this option may expose tables or fields explicitly blacklisted or masked by including the original SQL statement in the change event. For this reason the default value is 'false'. | false | [...]
 | *includeSchemaChanges* (mysql) | Whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change will be recorded using a key that contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s).The default is 'true'. This is independent of how the connector internally records database history. | true | boolean
 | *inconsistentSchemaHandlingMode* (mysql) | Specify how binlog events that belong to a table missing from internal schema representation (i.e. internal representation is not consistent with database) should be handled, including:'fail' (the default) an exception indicating the problematic event and its binlog position is raised, causing the connector to be stopped; 'warn' the problematic event and its binlog position will be logged and the event will be skipped;'skip' the problematic ev [...]
+| *incrementalSnapshotChunkSize* (mysql) | The maximum size of chunk for incremental snapshotting | 1024 | int
 | *maxBatchSize* (mysql) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
 | *maxQueueSize* (mysql) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
 | *maxQueueSizeInBytes* (mysql) | Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to 0. Mean the feature is not enabled | 0 | long
@@ -259,7 +267,7 @@ with the following path and query parameters:
 | *snapshotMaxThreads* (mysql) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
 | *snapshotMode* (mysql) | The criteria for running a snapshot upon startup of the connector. Options include: 'when_needed' to specify that the connector run a snapshot upon startup whenever it deems it necessary; 'schema_only' to only take a snapshot of the schema (table structures) but no actual data; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector  [...]
 | *snapshotNewTables* (mysql) | BETA FEATURE: On connector restart, the connector will check if there have been any new tables added to the configuration, and snapshot them. There is presently only two options:'off': Default behavior. Do not snapshot new tables.'parallel': The snapshot of the new tables will occur in parallel to the continued binlog reading of the old tables. When the snapshot completes, an independent binlog reader will begin reading the events for the new tables until  [...]
-| *snapshotSelectStatement{zwsp}Overrides* (mysql) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The val [...]
+| *snapshotSelectStatement{zwsp}Overrides* (mysql) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The valu [...]
 | *sourceStructVersion* (mysql) | A version of the format of the publicly visible source part in the message | v2 | String
 | *tableBlacklist* (mysql) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
 | *tableExcludeList* (mysql) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring |  | String
diff --git a/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/DebeziumPostgresComponentConfigurer.java b/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/DebeziumPostgresComponentConfigurer.java
index 42524a3..0810a46 100644
--- a/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/DebeziumPostgresComponentConfigurer.java
+++ b/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/DebeziumPostgresComponentConfigurer.java
@@ -1,14 +1,9 @@
 /* Generated by camel build tools - do NOT edit this file! */
 package org.apache.camel.component.debezium;
 
-import java.util.Map;
-
 import org.apache.camel.CamelContext;
-import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
-import org.apache.camel.spi.PropertyConfigurerGetter;
-import org.apache.camel.spi.ConfigurerStrategy;
 import org.apache.camel.spi.GeneratedPropertyConfigurer;
-import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.spi.PropertyConfigurerGetter;
 import org.apache.camel.support.component.PropertyConfigurerSupport;
 
 /**
@@ -94,6 +89,8 @@ public class DebeziumPostgresComponentConfigurer extends PropertyConfigurerSuppo
         case "hstoreHandlingMode": getOrCreateConfiguration(target).setHstoreHandlingMode(property(camelContext, java.lang.String.class, value)); return true;
         case "includeunknowndatatypes":
         case "includeUnknownDatatypes": getOrCreateConfiguration(target).setIncludeUnknownDatatypes(property(camelContext, boolean.class, value)); return true;
+        case "incrementalsnapshotchunksize":
+        case "incrementalSnapshotChunkSize": getOrCreateConfiguration(target).setIncrementalSnapshotChunkSize(property(camelContext, int.class, value)); return true;
         case "internalkeyconverter":
         case "internalKeyConverter": getOrCreateConfiguration(target).setInternalKeyConverter(property(camelContext, java.lang.String.class, value)); return true;
         case "internalvalueconverter":
@@ -275,6 +272,8 @@ public class DebeziumPostgresComponentConfigurer extends PropertyConfigurerSuppo
         case "hstoreHandlingMode": return java.lang.String.class;
         case "includeunknowndatatypes":
         case "includeUnknownDatatypes": return boolean.class;
+        case "incrementalsnapshotchunksize":
+        case "incrementalSnapshotChunkSize": return int.class;
         case "internalkeyconverter":
         case "internalKeyConverter": return java.lang.String.class;
         case "internalvalueconverter":
@@ -457,6 +456,8 @@ public class DebeziumPostgresComponentConfigurer extends PropertyConfigurerSuppo
         case "hstoreHandlingMode": return getOrCreateConfiguration(target).getHstoreHandlingMode();
         case "includeunknowndatatypes":
         case "includeUnknownDatatypes": return getOrCreateConfiguration(target).isIncludeUnknownDatatypes();
+        case "incrementalsnapshotchunksize":
+        case "incrementalSnapshotChunkSize": return getOrCreateConfiguration(target).getIncrementalSnapshotChunkSize();
         case "internalkeyconverter":
         case "internalKeyConverter": return getOrCreateConfiguration(target).getInternalKeyConverter();
         case "internalvalueconverter":
diff --git a/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/DebeziumPostgresEndpointConfigurer.java b/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/DebeziumPostgresEndpointConfigurer.java
index a6feba1..2dc7062 100644
--- a/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/DebeziumPostgresEndpointConfigurer.java
+++ b/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/DebeziumPostgresEndpointConfigurer.java
@@ -1,14 +1,9 @@
 /* Generated by camel build tools - do NOT edit this file! */
 package org.apache.camel.component.debezium;
 
-import java.util.Map;
-
 import org.apache.camel.CamelContext;
-import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
-import org.apache.camel.spi.PropertyConfigurerGetter;
-import org.apache.camel.spi.ConfigurerStrategy;
 import org.apache.camel.spi.GeneratedPropertyConfigurer;
-import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.spi.PropertyConfigurerGetter;
 import org.apache.camel.support.component.PropertyConfigurerSupport;
 
 /**
@@ -88,6 +83,8 @@ public class DebeziumPostgresEndpointConfigurer extends PropertyConfigurerSuppor
         case "hstoreHandlingMode": target.getConfiguration().setHstoreHandlingMode(property(camelContext, java.lang.String.class, value)); return true;
         case "includeunknowndatatypes":
         case "includeUnknownDatatypes": target.getConfiguration().setIncludeUnknownDatatypes(property(camelContext, boolean.class, value)); return true;
+        case "incrementalsnapshotchunksize":
+        case "incrementalSnapshotChunkSize": target.getConfiguration().setIncrementalSnapshotChunkSize(property(camelContext, int.class, value)); return true;
         case "internalkeyconverter":
         case "internalKeyConverter": target.getConfiguration().setInternalKeyConverter(property(camelContext, java.lang.String.class, value)); return true;
         case "internalvalueconverter":
@@ -270,6 +267,8 @@ public class DebeziumPostgresEndpointConfigurer extends PropertyConfigurerSuppor
         case "hstoreHandlingMode": return java.lang.String.class;
         case "includeunknowndatatypes":
         case "includeUnknownDatatypes": return boolean.class;
+        case "incrementalsnapshotchunksize":
+        case "incrementalSnapshotChunkSize": return int.class;
         case "internalkeyconverter":
         case "internalKeyConverter": return java.lang.String.class;
         case "internalvalueconverter":
@@ -453,6 +452,8 @@ public class DebeziumPostgresEndpointConfigurer extends PropertyConfigurerSuppor
         case "hstoreHandlingMode": return target.getConfiguration().getHstoreHandlingMode();
         case "includeunknowndatatypes":
         case "includeUnknownDatatypes": return target.getConfiguration().isIncludeUnknownDatatypes();
+        case "incrementalsnapshotchunksize":
+        case "incrementalSnapshotChunkSize": return target.getConfiguration().getIncrementalSnapshotChunkSize();
         case "internalkeyconverter":
         case "internalKeyConverter": return target.getConfiguration().getInternalKeyConverter();
         case "internalvalueconverter":
diff --git a/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/DebeziumPostgresEndpointUriFactory.java b/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/DebeziumPostgresEndpointUriFactory.java
index e9330c2..8e28e7a 100644
--- a/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/DebeziumPostgresEndpointUriFactory.java
+++ b/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/DebeziumPostgresEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class DebeziumPostgresEndpointUriFactory extends org.apache.camel.support
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(89);
+        Set<String> props = new HashSet<>(90);
         props.add("slotDropOnStop");
         props.add("includeUnknownDatatypes");
         props.add("maxBatchSize");
@@ -104,6 +104,7 @@ public class DebeziumPostgresEndpointUriFactory extends org.apache.camel.support
         props.add("offsetCommitPolicy");
         props.add("tableIncludeList");
         props.add("intervalHandlingMode");
+        props.add("incrementalSnapshotChunkSize");
         props.add("columnExcludeList");
         props.add("databaseSslfactory");
         props.add("columnIncludeList");
diff --git a/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/configuration/PostgresConnectorEmbeddedDebeziumConfiguration.java b/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/configuration/PostgresConnectorEmbeddedDebeziumConfiguration.java
index be79312..16c565a 100644
--- a/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/configuration/PostgresConnectorEmbeddedDebeziumConfiguration.java
+++ b/components/camel-debezium/camel-debezium-postgres/src/generated/java/org/apache/camel/component/debezium/configuration/PostgresConnectorEmbeddedDebeziumConfiguration.java
@@ -105,6 +105,8 @@ public class PostgresConnectorEmbeddedDebeziumConfiguration
     private String snapshotCustomClass;
     @UriParam(label = LABEL_NAME, defaultValue = "debezium")
     private String slotName = "debezium";
+    @UriParam(label = LABEL_NAME, defaultValue = "1024")
+    private int incrementalSnapshotChunkSize = 1024;
     @UriParam(label = LABEL_NAME, defaultValue = "json")
     private String hstoreHandlingMode = "json";
     @UriParam(label = LABEL_NAME, defaultValue = "10s", javaType = "java.time.Duration")
@@ -520,7 +522,7 @@ public class PostgresConnectorEmbeddedDebeziumConfiguration
     /**
      *  This property contains a comma-separated list of fully-qualified tables
      * (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on
-     * thespecific connectors . Select statements for the individual tables are
+     * thespecific connectors. Select statements for the individual tables are
      * specified in further configuration properties, one for each table,
      * identified by the id
      * 'snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]' or
@@ -722,9 +724,8 @@ public class PostgresConnectorEmbeddedDebeziumConfiguration
      * stop after completing the snapshot and before it would normally start
      * emitting changes;'never' to specify the connector should never run a
      * snapshot and that upon first startup the connector should read from the
-     * last position (LSN) recorded by the server; and'exported' to specify the
-     * connector should run a snapshot based on the position when the
-     * replication slot was created; 'custom' to specify a custom class with
+     * last position (LSN) recorded by the server; and'exported' deprecated, use
+     * 'initial' instead; 'custom' to specify a custom class with
      * 'snapshot.custom_class' which will be loaded and used to determine the
      * snapshot, see docs for more details.
      */
@@ -777,6 +778,17 @@ public class PostgresConnectorEmbeddedDebeziumConfiguration
     }
 
     /**
+     * The maximum size of chunk for incremental snapshotting
+     */
+    public void setIncrementalSnapshotChunkSize(int incrementalSnapshotChunkSize) {
+        this.incrementalSnapshotChunkSize = incrementalSnapshotChunkSize;
+    }
+
+    public int getIncrementalSnapshotChunkSize() {
+        return incrementalSnapshotChunkSize;
+    }
+
+    /**
      * Specify how HSTORE columns should be represented in change events,
      * including:'json' represents values as string-ified JSON (default)'map'
      * represents values as a key/value map
@@ -1201,6 +1213,7 @@ public class PostgresConnectorEmbeddedDebeziumConfiguration
         addPropertyIfNotNull(configBuilder, "max.queue.size", maxQueueSize);
         addPropertyIfNotNull(configBuilder, "snapshot.custom.class", snapshotCustomClass);
         addPropertyIfNotNull(configBuilder, "slot.name", slotName);
+        addPropertyIfNotNull(configBuilder, "incremental.snapshot.chunk.size", incrementalSnapshotChunkSize);
         addPropertyIfNotNull(configBuilder, "hstore.handling.mode", hstoreHandlingMode);
         addPropertyIfNotNull(configBuilder, "retriable.restart.connector.wait.ms", retriableRestartConnectorWaitMs);
         addPropertyIfNotNull(configBuilder, "snapshot.delay.ms", snapshotDelayMs);
diff --git a/components/camel-debezium/camel-debezium-postgres/src/generated/resources/org/apache/camel/component/debezium/debezium-postgres.json b/components/camel-debezium/camel-debezium-postgres/src/generated/resources/org/apache/camel/component/debezium/debezium-postgres.json
index e0ab153..4173552 100644
--- a/components/camel-debezium/camel-debezium-postgres/src/generated/resources/org/apache/camel/component/debezium/debezium-postgres.json
+++ b/components/camel-debezium/camel-debezium-postgres/src/generated/resources/org/apache/camel/component/debezium/debezium-postgres.json
@@ -66,6 +66,7 @@
     "heartbeatTopicsPrefix": { "kind": "property", "displayName": "Heartbeat Topics Prefix", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "__debezium-heartbeat", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The  [...]
     "hstoreHandlingMode": { "kind": "property", "displayName": "Hstore Handling Mode", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "json", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Specify how HSTORE columns [...]
     "includeUnknownDatatypes": { "kind": "property", "displayName": "Include Unknown Datatypes", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Specify whether the field [...]
+    "incrementalSnapshotChunkSize": { "kind": "property", "displayName": "Incremental Snapshot Chunk Size", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1024, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The maximum size of [...]
     "intervalHandlingMode": { "kind": "property", "displayName": "Interval Handling Mode", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "numeric", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Specify how INTERVA [...]
     "maxBatchSize": { "kind": "property", "displayName": "Max Batch Size", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2048, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of each batch of source records. Defaul [...]
     "maxQueueSize": { "kind": "property", "displayName": "Max Queue Size", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 8192, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of the queue for change events read fro [...]
@@ -97,8 +98,8 @@
     "snapshotIncludeCollectionList": { "kind": "property", "displayName": "Snapshot Include Collection List", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "this setting must be set to [...]
     "snapshotLockTimeoutMs": { "kind": "property", "displayName": "Snapshot Lock Timeout Ms", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10s", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The maximum number of millis t [...]
     "snapshotMaxThreads": { "kind": "property", "displayName": "Snapshot Max Threads", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The maximum number of threads used to perfo [...]
-    "snapshotMode": { "kind": "property", "displayName": "Snapshot Mode", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "initial", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The criteria for running a snapshot  [...]
-    "snapshotSelectStatementOverrides": { "kind": "property", "displayName": "Snapshot Select Statement Overrides", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "This property contain [...]
+    "snapshotMode": { "kind": "property", "displayName": "Snapshot Mode", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "initial", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The criteria for running a snapshot  [...]
+    "snapshotSelectStatementOverrides": { "kind": "property", "displayName": "Snapshot Select Statement Overrides", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "This property contain [...]
     "sourceStructVersion": { "kind": "property", "displayName": "Source Struct Version", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "v2", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A version of the format of [...]
     "statusUpdateIntervalMs": { "kind": "property", "displayName": "Status Update Interval Ms", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "duration", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10s", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Frequency for sending replica [...]
     "tableBlacklist": { "kind": "property", "displayName": "Table Blacklist", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A comma-separated list of regular expressions that match th [...]
@@ -157,6 +158,7 @@
     "heartbeatTopicsPrefix": { "kind": "parameter", "displayName": "Heartbeat Topics Prefix", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "__debezium-heartbeat", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The [...]
     "hstoreHandlingMode": { "kind": "parameter", "displayName": "Hstore Handling Mode", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "json", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Specify how HSTORE column [...]
     "includeUnknownDatatypes": { "kind": "parameter", "displayName": "Include Unknown Datatypes", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Specify whether the fiel [...]
+    "incrementalSnapshotChunkSize": { "kind": "parameter", "displayName": "Incremental Snapshot Chunk Size", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1024, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The maximum size o [...]
     "intervalHandlingMode": { "kind": "parameter", "displayName": "Interval Handling Mode", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "numeric", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Specify how INTERV [...]
     "maxBatchSize": { "kind": "parameter", "displayName": "Max Batch Size", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2048, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of each batch of source records. Defau [...]
     "maxQueueSize": { "kind": "parameter", "displayName": "Max Queue Size", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 8192, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of the queue for change events read fr [...]
@@ -188,8 +190,8 @@
     "snapshotIncludeCollectionList": { "kind": "parameter", "displayName": "Snapshot Include Collection List", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "this setting must be set t [...]
     "snapshotLockTimeoutMs": { "kind": "parameter", "displayName": "Snapshot Lock Timeout Ms", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10s", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The maximum number of millis  [...]
     "snapshotMaxThreads": { "kind": "parameter", "displayName": "Snapshot Max Threads", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The maximum number of threads used to perf [...]
-    "snapshotMode": { "kind": "parameter", "displayName": "Snapshot Mode", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "initial", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The criteria for running a snapshot [...]
-    "snapshotSelectStatementOverrides": { "kind": "parameter", "displayName": "Snapshot Select Statement Overrides", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "This property contai [...]
+    "snapshotMode": { "kind": "parameter", "displayName": "Snapshot Mode", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "initial", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The criteria for running a snapshot [...]
+    "snapshotSelectStatementOverrides": { "kind": "parameter", "displayName": "Snapshot Select Statement Overrides", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "This property contai [...]
     "sourceStructVersion": { "kind": "parameter", "displayName": "Source Struct Version", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "v2", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A version of the format o [...]
     "statusUpdateIntervalMs": { "kind": "parameter", "displayName": "Status Update Interval Ms", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "duration", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10s", "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Frequency for sending replic [...]
     "tableBlacklist": { "kind": "parameter", "displayName": "Table Blacklist", "group": "postgres", "label": "consumer,postgres", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.PostgresConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A comma-separated list of regular expressions that match t [...]
diff --git a/components/camel-debezium/camel-debezium-postgres/src/main/docs/debezium-postgres-component.adoc b/components/camel-debezium/camel-debezium-postgres/src/main/docs/debezium-postgres-component.adoc
index 3822935..6ba3026 100644
--- a/components/camel-debezium/camel-debezium-postgres/src/main/docs/debezium-postgres-component.adoc
+++ b/components/camel-debezium/camel-debezium-postgres/src/main/docs/debezium-postgres-component.adoc
@@ -47,7 +47,7 @@ debezium-postgres:name[?options]
 
 
 // component options: START
-The Debezium PostgresSQL Connector component supports 88 options, which are listed below.
+The Debezium PostgresSQL Connector component supports 89 options, which are listed below.
 
 
 
@@ -98,6 +98,7 @@ The Debezium PostgresSQL Connector component supports 88 options, which are list
 | *heartbeatTopicsPrefix* (postgres) | The prefix that is used to name heartbeat topics.Defaults to __debezium-heartbeat. | __debezium-heartbeat | String
 | *hstoreHandlingMode* (postgres) | Specify how HSTORE columns should be represented in change events, including:'json' represents values as string-ified JSON (default)'map' represents values as a key/value map | json | String
 | *includeUnknownDatatypes* (postgres) | Specify whether the fields of data type not supported by Debezium should be processed:'false' (the default) omits the fields; 'true' converts the field into an implementation dependent binary representation. | false | boolean
+| *incrementalSnapshotChunkSize* (postgres) | The maximum size of chunk for incremental snapshotting | 1024 | int
 | *intervalHandlingMode* (postgres) | Specify how INTERVAL columns should be represented in change events, including:'string' represents values as an exact ISO formatted string'numeric' (default) represents values using the inexact conversion into microseconds | numeric | String
 | *maxBatchSize* (postgres) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
 | *maxQueueSize* (postgres) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
@@ -129,8 +130,8 @@ The Debezium PostgresSQL Connector component supports 88 options, which are list
 | *snapshotIncludeCollectionList* (postgres) | this setting must be set to specify a list of tables/collections whose snapshot must be taken on creating or restarting the connector. |  | String
 | *snapshotLockTimeoutMs* (postgres) | The maximum number of millis to wait for table locks at the beginning of a snapshot. If locks cannot be acquired in this time frame, the snapshot will be aborted. Defaults to 10 seconds | 10s | long
 | *snapshotMaxThreads* (postgres) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
-| *snapshotMode* (postgres) | The criteria for running a snapshot upon startup of the connector. Options include: 'always' to specify that the connector run a snapshot each time it starts up; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes;'never' to specify t [...]
-| *snapshotSelectStatement{zwsp}Overrides* (postgres) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The  [...]
+| *snapshotMode* (postgres) | The criteria for running a snapshot upon startup of the connector. Options include: 'always' to specify that the connector run a snapshot each time it starts up; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes;'never' to specify t [...]
+| *snapshotSelectStatement{zwsp}Overrides* (postgres) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The v [...]
 | *sourceStructVersion* (postgres) | A version of the format of the publicly visible source part in the message | v2 | String
 | *statusUpdateIntervalMs* (postgres) | Frequency for sending replication connection status updates to the server, given in milliseconds. Defaults to 10 seconds (10,000 ms). | 10s | int
 | *tableBlacklist* (postgres) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
@@ -165,7 +166,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (88 parameters):
+=== Query Parameters (89 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -215,6 +216,7 @@ with the following path and query parameters:
 | *heartbeatTopicsPrefix* (postgres) | The prefix that is used to name heartbeat topics.Defaults to __debezium-heartbeat. | __debezium-heartbeat | String
 | *hstoreHandlingMode* (postgres) | Specify how HSTORE columns should be represented in change events, including:'json' represents values as string-ified JSON (default)'map' represents values as a key/value map | json | String
 | *includeUnknownDatatypes* (postgres) | Specify whether the fields of data type not supported by Debezium should be processed:'false' (the default) omits the fields; 'true' converts the field into an implementation dependent binary representation. | false | boolean
+| *incrementalSnapshotChunkSize* (postgres) | The maximum size of chunk for incremental snapshotting | 1024 | int
 | *intervalHandlingMode* (postgres) | Specify how INTERVAL columns should be represented in change events, including:'string' represents values as an exact ISO formatted string'numeric' (default) represents values using the inexact conversion into microseconds | numeric | String
 | *maxBatchSize* (postgres) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
 | *maxQueueSize* (postgres) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
@@ -246,8 +248,8 @@ with the following path and query parameters:
 | *snapshotIncludeCollectionList* (postgres) | this setting must be set to specify a list of tables/collections whose snapshot must be taken on creating or restarting the connector. |  | String
 | *snapshotLockTimeoutMs* (postgres) | The maximum number of millis to wait for table locks at the beginning of a snapshot. If locks cannot be acquired in this time frame, the snapshot will be aborted. Defaults to 10 seconds | 10s | long
 | *snapshotMaxThreads* (postgres) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
-| *snapshotMode* (postgres) | The criteria for running a snapshot upon startup of the connector. Options include: 'always' to specify that the connector run a snapshot each time it starts up; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes;'never' to specify t [...]
-| *snapshotSelectStatement{zwsp}Overrides* (postgres) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The  [...]
+| *snapshotMode* (postgres) | The criteria for running a snapshot upon startup of the connector. Options include: 'always' to specify that the connector run a snapshot each time it starts up; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes;'never' to specify t [...]
+| *snapshotSelectStatement{zwsp}Overrides* (postgres) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The v [...]
 | *sourceStructVersion* (postgres) | A version of the format of the publicly visible source part in the message | v2 | String
 | *statusUpdateIntervalMs* (postgres) | Frequency for sending replication connection status updates to the server, given in milliseconds. Defaults to 10 seconds (10,000 ms). | 10s | int
 | *tableBlacklist* (postgres) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
diff --git a/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/DebeziumSqlserverComponentConfigurer.java b/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/DebeziumSqlserverComponentConfigurer.java
index 46592e8..bbff0e4 100644
--- a/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/DebeziumSqlserverComponentConfigurer.java
+++ b/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/DebeziumSqlserverComponentConfigurer.java
@@ -1,14 +1,9 @@
 /* Generated by camel build tools - do NOT edit this file! */
 package org.apache.camel.component.debezium;
 
-import java.util.Map;
-
 import org.apache.camel.CamelContext;
-import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
-import org.apache.camel.spi.PropertyConfigurerGetter;
-import org.apache.camel.spi.ConfigurerStrategy;
 import org.apache.camel.spi.GeneratedPropertyConfigurer;
-import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.spi.PropertyConfigurerGetter;
 import org.apache.camel.support.component.PropertyConfigurerSupport;
 
 /**
@@ -62,6 +57,12 @@ public class DebeziumSqlserverComponentConfigurer extends PropertyConfigurerSupp
         case "databaseHistoryKafkaRecoveryPollIntervalMs": getOrCreateConfiguration(target).setDatabaseHistoryKafkaRecoveryPollIntervalMs(property(camelContext, int.class, value)); return true;
         case "databasehistorykafkatopic":
         case "databaseHistoryKafkaTopic": getOrCreateConfiguration(target).setDatabaseHistoryKafkaTopic(property(camelContext, java.lang.String.class, value)); return true;
+        case "databasehistoryskipunparseableddl":
+        case "databaseHistorySkipUnparseableDdl": getOrCreateConfiguration(target).setDatabaseHistorySkipUnparseableDdl(property(camelContext, boolean.class, value)); return true;
+        case "databasehistorystoreonlycapturedtablesddl":
+        case "databaseHistoryStoreOnlyCapturedTablesDdl": getOrCreateConfiguration(target).setDatabaseHistoryStoreOnlyCapturedTablesDdl(property(camelContext, boolean.class, value)); return true;
+        case "databasehistorystoreonlymonitoredtablesddl":
+        case "databaseHistoryStoreOnlyMonitoredTablesDdl": getOrCreateConfiguration(target).setDatabaseHistoryStoreOnlyMonitoredTablesDdl(property(camelContext, boolean.class, value)); return true;
         case "databasehostname":
         case "databaseHostname": getOrCreateConfiguration(target).setDatabaseHostname(property(camelContext, java.lang.String.class, value)); return true;
         case "databaseinstance":
@@ -94,6 +95,8 @@ public class DebeziumSqlserverComponentConfigurer extends PropertyConfigurerSupp
         case "internalValueConverter": getOrCreateConfiguration(target).setInternalValueConverter(property(camelContext, java.lang.String.class, value)); return true;
         case "maxbatchsize":
         case "maxBatchSize": getOrCreateConfiguration(target).setMaxBatchSize(property(camelContext, int.class, value)); return true;
+        case "maxiterationtransactions":
+        case "maxIterationTransactions": getOrCreateConfiguration(target).setMaxIterationTransactions(property(camelContext, int.class, value)); return true;
         case "maxqueuesize":
         case "maxQueueSize": getOrCreateConfiguration(target).setMaxQueueSize(property(camelContext, int.class, value)); return true;
         case "maxqueuesizeinbytes":
@@ -205,6 +208,12 @@ public class DebeziumSqlserverComponentConfigurer extends PropertyConfigurerSupp
         case "databaseHistoryKafkaRecoveryPollIntervalMs": return int.class;
         case "databasehistorykafkatopic":
         case "databaseHistoryKafkaTopic": return java.lang.String.class;
+        case "databasehistoryskipunparseableddl":
+        case "databaseHistorySkipUnparseableDdl": return boolean.class;
+        case "databasehistorystoreonlycapturedtablesddl":
+        case "databaseHistoryStoreOnlyCapturedTablesDdl": return boolean.class;
+        case "databasehistorystoreonlymonitoredtablesddl":
+        case "databaseHistoryStoreOnlyMonitoredTablesDdl": return boolean.class;
         case "databasehostname":
         case "databaseHostname": return java.lang.String.class;
         case "databaseinstance":
@@ -237,6 +246,8 @@ public class DebeziumSqlserverComponentConfigurer extends PropertyConfigurerSupp
         case "internalValueConverter": return java.lang.String.class;
         case "maxbatchsize":
         case "maxBatchSize": return int.class;
+        case "maxiterationtransactions":
+        case "maxIterationTransactions": return int.class;
         case "maxqueuesize":
         case "maxQueueSize": return int.class;
         case "maxqueuesizeinbytes":
@@ -349,6 +360,12 @@ public class DebeziumSqlserverComponentConfigurer extends PropertyConfigurerSupp
         case "databaseHistoryKafkaRecoveryPollIntervalMs": return getOrCreateConfiguration(target).getDatabaseHistoryKafkaRecoveryPollIntervalMs();
         case "databasehistorykafkatopic":
         case "databaseHistoryKafkaTopic": return getOrCreateConfiguration(target).getDatabaseHistoryKafkaTopic();
+        case "databasehistoryskipunparseableddl":
+        case "databaseHistorySkipUnparseableDdl": return getOrCreateConfiguration(target).isDatabaseHistorySkipUnparseableDdl();
+        case "databasehistorystoreonlycapturedtablesddl":
+        case "databaseHistoryStoreOnlyCapturedTablesDdl": return getOrCreateConfiguration(target).isDatabaseHistoryStoreOnlyCapturedTablesDdl();
+        case "databasehistorystoreonlymonitoredtablesddl":
+        case "databaseHistoryStoreOnlyMonitoredTablesDdl": return getOrCreateConfiguration(target).isDatabaseHistoryStoreOnlyMonitoredTablesDdl();
         case "databasehostname":
         case "databaseHostname": return getOrCreateConfiguration(target).getDatabaseHostname();
         case "databaseinstance":
@@ -381,6 +398,8 @@ public class DebeziumSqlserverComponentConfigurer extends PropertyConfigurerSupp
         case "internalValueConverter": return getOrCreateConfiguration(target).getInternalValueConverter();
         case "maxbatchsize":
         case "maxBatchSize": return getOrCreateConfiguration(target).getMaxBatchSize();
+        case "maxiterationtransactions":
+        case "maxIterationTransactions": return getOrCreateConfiguration(target).getMaxIterationTransactions();
         case "maxqueuesize":
         case "maxQueueSize": return getOrCreateConfiguration(target).getMaxQueueSize();
         case "maxqueuesizeinbytes":
diff --git a/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/DebeziumSqlserverEndpointConfigurer.java b/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/DebeziumSqlserverEndpointConfigurer.java
index c382e14..e1d9639 100644
--- a/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/DebeziumSqlserverEndpointConfigurer.java
+++ b/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/DebeziumSqlserverEndpointConfigurer.java
@@ -1,14 +1,9 @@
 /* Generated by camel build tools - do NOT edit this file! */
 package org.apache.camel.component.debezium;
 
-import java.util.Map;
-
 import org.apache.camel.CamelContext;
-import org.apache.camel.spi.ExtendedPropertyConfigurerGetter;
-import org.apache.camel.spi.PropertyConfigurerGetter;
-import org.apache.camel.spi.ConfigurerStrategy;
 import org.apache.camel.spi.GeneratedPropertyConfigurer;
-import org.apache.camel.util.CaseInsensitiveMap;
+import org.apache.camel.spi.PropertyConfigurerGetter;
 import org.apache.camel.support.component.PropertyConfigurerSupport;
 
 /**
@@ -52,6 +47,12 @@ public class DebeziumSqlserverEndpointConfigurer extends PropertyConfigurerSuppo
         case "databaseHistoryKafkaRecoveryPollIntervalMs": target.getConfiguration().setDatabaseHistoryKafkaRecoveryPollIntervalMs(property(camelContext, int.class, value)); return true;
         case "databasehistorykafkatopic":
         case "databaseHistoryKafkaTopic": target.getConfiguration().setDatabaseHistoryKafkaTopic(property(camelContext, java.lang.String.class, value)); return true;
+        case "databasehistoryskipunparseableddl":
+        case "databaseHistorySkipUnparseableDdl": target.getConfiguration().setDatabaseHistorySkipUnparseableDdl(property(camelContext, boolean.class, value)); return true;
+        case "databasehistorystoreonlycapturedtablesddl":
+        case "databaseHistoryStoreOnlyCapturedTablesDdl": target.getConfiguration().setDatabaseHistoryStoreOnlyCapturedTablesDdl(property(camelContext, boolean.class, value)); return true;
+        case "databasehistorystoreonlymonitoredtablesddl":
+        case "databaseHistoryStoreOnlyMonitoredTablesDdl": target.getConfiguration().setDatabaseHistoryStoreOnlyMonitoredTablesDdl(property(camelContext, boolean.class, value)); return true;
         case "databasehostname":
         case "databaseHostname": target.getConfiguration().setDatabaseHostname(property(camelContext, java.lang.String.class, value)); return true;
         case "databaseinstance":
@@ -88,6 +89,8 @@ public class DebeziumSqlserverEndpointConfigurer extends PropertyConfigurerSuppo
         case "internalValueConverter": target.getConfiguration().setInternalValueConverter(property(camelContext, java.lang.String.class, value)); return true;
         case "maxbatchsize":
         case "maxBatchSize": target.getConfiguration().setMaxBatchSize(property(camelContext, int.class, value)); return true;
+        case "maxiterationtransactions":
+        case "maxIterationTransactions": target.getConfiguration().setMaxIterationTransactions(property(camelContext, int.class, value)); return true;
         case "maxqueuesize":
         case "maxQueueSize": target.getConfiguration().setMaxQueueSize(property(camelContext, int.class, value)); return true;
         case "maxqueuesizeinbytes":
@@ -196,6 +199,12 @@ public class DebeziumSqlserverEndpointConfigurer extends PropertyConfigurerSuppo
         case "databaseHistoryKafkaRecoveryPollIntervalMs": return int.class;
         case "databasehistorykafkatopic":
         case "databaseHistoryKafkaTopic": return java.lang.String.class;
+        case "databasehistoryskipunparseableddl":
+        case "databaseHistorySkipUnparseableDdl": return boolean.class;
+        case "databasehistorystoreonlycapturedtablesddl":
+        case "databaseHistoryStoreOnlyCapturedTablesDdl": return boolean.class;
+        case "databasehistorystoreonlymonitoredtablesddl":
+        case "databaseHistoryStoreOnlyMonitoredTablesDdl": return boolean.class;
         case "databasehostname":
         case "databaseHostname": return java.lang.String.class;
         case "databaseinstance":
@@ -232,6 +241,8 @@ public class DebeziumSqlserverEndpointConfigurer extends PropertyConfigurerSuppo
         case "internalValueConverter": return java.lang.String.class;
         case "maxbatchsize":
         case "maxBatchSize": return int.class;
+        case "maxiterationtransactions":
+        case "maxIterationTransactions": return int.class;
         case "maxqueuesize":
         case "maxQueueSize": return int.class;
         case "maxqueuesizeinbytes":
@@ -341,6 +352,12 @@ public class DebeziumSqlserverEndpointConfigurer extends PropertyConfigurerSuppo
         case "databaseHistoryKafkaRecoveryPollIntervalMs": return target.getConfiguration().getDatabaseHistoryKafkaRecoveryPollIntervalMs();
         case "databasehistorykafkatopic":
         case "databaseHistoryKafkaTopic": return target.getConfiguration().getDatabaseHistoryKafkaTopic();
+        case "databasehistoryskipunparseableddl":
+        case "databaseHistorySkipUnparseableDdl": return target.getConfiguration().isDatabaseHistorySkipUnparseableDdl();
+        case "databasehistorystoreonlycapturedtablesddl":
+        case "databaseHistoryStoreOnlyCapturedTablesDdl": return target.getConfiguration().isDatabaseHistoryStoreOnlyCapturedTablesDdl();
+        case "databasehistorystoreonlymonitoredtablesddl":
+        case "databaseHistoryStoreOnlyMonitoredTablesDdl": return target.getConfiguration().isDatabaseHistoryStoreOnlyMonitoredTablesDdl();
         case "databasehostname":
         case "databaseHostname": return target.getConfiguration().getDatabaseHostname();
         case "databaseinstance":
@@ -377,6 +394,8 @@ public class DebeziumSqlserverEndpointConfigurer extends PropertyConfigurerSuppo
         case "internalValueConverter": return target.getConfiguration().getInternalValueConverter();
         case "maxbatchsize":
         case "maxBatchSize": return target.getConfiguration().getMaxBatchSize();
+        case "maxiterationtransactions":
+        case "maxIterationTransactions": return target.getConfiguration().getMaxIterationTransactions();
         case "maxqueuesize":
         case "maxQueueSize": return target.getConfiguration().getMaxQueueSize();
         case "maxqueuesizeinbytes":
diff --git a/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/DebeziumSqlserverEndpointUriFactory.java b/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/DebeziumSqlserverEndpointUriFactory.java
index d0470aa..bebb359 100644
--- a/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/DebeziumSqlserverEndpointUriFactory.java
+++ b/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/DebeziumSqlserverEndpointUriFactory.java
@@ -20,7 +20,7 @@ public class DebeziumSqlserverEndpointUriFactory extends org.apache.camel.suppor
     private static final Set<String> PROPERTY_NAMES;
     private static final Set<String> SECRET_PROPERTY_NAMES;
     static {
-        Set<String> props = new HashSet<>(70);
+        Set<String> props = new HashSet<>(74);
         props.add("maxBatchSize");
         props.add("internalKeyConverter");
         props.add("snapshotDelayMs");
@@ -41,6 +41,7 @@ public class DebeziumSqlserverEndpointUriFactory extends org.apache.camel.suppor
         props.add("databaseServerTimezone");
         props.add("binaryHandlingMode");
         props.add("databaseHostname");
+        props.add("databaseHistorySkipUnparseableDdl");
         props.add("eventProcessingFailureHandlingMode");
         props.add("offsetCommitTimeoutMs");
         props.add("snapshotSelectStatementOverrides");
@@ -50,6 +51,7 @@ public class DebeziumSqlserverEndpointUriFactory extends org.apache.camel.suppor
         props.add("name");
         props.add("snapshotFetchSize");
         props.add("offsetStoragePartitions");
+        props.add("maxIterationTransactions");
         props.add("additionalProperties");
         props.add("offsetStorageReplicationFactor");
         props.add("exceptionHandler");
@@ -60,6 +62,7 @@ public class DebeziumSqlserverEndpointUriFactory extends org.apache.camel.suppor
         props.add("databaseHistoryFileFilename");
         props.add("databaseHistory");
         props.add("columnPropagateSourceType");
+        props.add("databaseHistoryStoreOnlyCapturedTablesDdl");
         props.add("offsetStorage");
         props.add("includeSchemaChanges");
         props.add("internalValueConverter");
@@ -73,6 +76,7 @@ public class DebeziumSqlserverEndpointUriFactory extends org.apache.camel.suppor
         props.add("sanitizeFieldNames");
         props.add("databaseHistoryKafkaTopic");
         props.add("tableWhitelist");
+        props.add("databaseHistoryStoreOnlyMonitoredTablesDdl");
         props.add("tableIgnoreBuiltin");
         props.add("signalDataCollection");
         props.add("exchangePattern");
diff --git a/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/configuration/SqlServerConnectorEmbeddedDebeziumConfiguration.java b/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/configuration/SqlServerConnectorEmbeddedDebeziumConfiguration.java
index 5356385..b1889c4 100644
--- a/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/configuration/SqlServerConnectorEmbeddedDebeziumConfiguration.java
+++ b/components/camel-debezium/camel-debezium-sqlserver/src/generated/java/org/apache/camel/component/debezium/configuration/SqlServerConnectorEmbeddedDebeziumConfiguration.java
@@ -30,6 +30,8 @@ public class SqlServerConnectorEmbeddedDebeziumConfiguration
     private int databaseHistoryKafkaRecoveryPollIntervalMs = 100;
     @UriParam(label = LABEL_NAME)
     private String signalDataCollection;
+    @UriParam(label = LABEL_NAME, defaultValue = "false")
+    private boolean databaseHistoryStoreOnlyCapturedTablesDdl = false;
     @UriParam(label = LABEL_NAME)
     private String converters;
     @UriParam(label = LABEL_NAME, defaultValue = "__debezium-heartbeat")
@@ -67,6 +69,8 @@ public class SqlServerConnectorEmbeddedDebeziumConfiguration
     @UriParam(label = LABEL_NAME)
     @Metadata(required = true)
     private String databasePassword;
+    @UriParam(label = LABEL_NAME, defaultValue = "false")
+    private boolean databaseHistoryStoreOnlyMonitoredTablesDdl = false;
     @UriParam(label = LABEL_NAME, defaultValue = "2048")
     private int maxBatchSize = 2048;
     @UriParam(label = LABEL_NAME)
@@ -97,6 +101,8 @@ public class SqlServerConnectorEmbeddedDebeziumConfiguration
     private String decimalHandlingMode = "precise";
     @UriParam(label = LABEL_NAME, defaultValue = "bytes")
     private String binaryHandlingMode = "bytes";
+    @UriParam(label = LABEL_NAME, defaultValue = "false")
+    private boolean databaseHistorySkipUnparseableDdl = false;
     @UriParam(label = LABEL_NAME, defaultValue = "true")
     private boolean tableIgnoreBuiltin = true;
     @UriParam(label = LABEL_NAME)
@@ -118,6 +124,8 @@ public class SqlServerConnectorEmbeddedDebeziumConfiguration
     private int snapshotMaxThreads = 1;
     @UriParam(label = LABEL_NAME, defaultValue = "1433")
     private int databasePort = 1433;
+    @UriParam(label = LABEL_NAME, defaultValue = "0")
+    private int maxIterationTransactions = 0;
     @UriParam(label = LABEL_NAME)
     private String columnExcludeList;
     @UriParam(label = LABEL_NAME)
@@ -246,6 +254,20 @@ public class SqlServerConnectorEmbeddedDebeziumConfiguration
     }
 
     /**
+     * Controls what DDL will Debezium store in database history. By default
+     * (false) Debezium will store all incoming DDL statements. If set to true,
+     * then only DDL that manipulates a captured table will be stored.
+     */
+    public void setDatabaseHistoryStoreOnlyCapturedTablesDdl(
+            boolean databaseHistoryStoreOnlyCapturedTablesDdl) {
+        this.databaseHistoryStoreOnlyCapturedTablesDdl = databaseHistoryStoreOnlyCapturedTablesDdl;
+    }
+
+    public boolean isDatabaseHistoryStoreOnlyCapturedTablesDdl() {
+        return databaseHistoryStoreOnlyCapturedTablesDdl;
+    }
+
+    /**
      * Optional list of custom converters that would be used instead of default
      * ones. The converters are defined using '<converter.prefix>.type' config
      * option and configured using options '<converter.prefix>.<option>'
@@ -299,8 +321,8 @@ public class SqlServerConnectorEmbeddedDebeziumConfiguration
      * Configures the criteria of the attached timestamp within the source
      * record (ts_ms).Options include:'commit', (default) the source timestamp
      * is set to the instant where the record was committed in the
-     * database'processing', the source timestamp is set to the instant where
-     * the record was processed by Debezium.
+     * database'processing', (deprecated) the source timestamp is set to the
+     * instant where the record was processed by Debezium.
      */
     public void setSourceTimestampMode(String sourceTimestampMode) {
         this.sourceTimestampMode = sourceTimestampMode;
@@ -361,7 +383,7 @@ public class SqlServerConnectorEmbeddedDebeziumConfiguration
     /**
      *  This property contains a comma-separated list of fully-qualified tables
      * (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on
-     * thespecific connectors . Select statements for the individual tables are
+     * thespecific connectors. Select statements for the individual tables are
      * specified in further configuration properties, one for each table,
      * identified by the id
      * 'snapshot.select.statement.overrides.[DB_NAME].[TABLE_NAME]' or
@@ -482,6 +504,22 @@ public class SqlServerConnectorEmbeddedDebeziumConfiguration
     }
 
     /**
+     * Controls what DDL will Debezium store in database history. By default
+     * (false) Debezium will store all incoming DDL statements. If set to true,
+     * then only DDL that manipulates a monitored table will be stored
+     * (deprecated, use "database.history.store.only.captured.tables.ddl"
+     * instead)
+     */
+    public void setDatabaseHistoryStoreOnlyMonitoredTablesDdl(
+            boolean databaseHistoryStoreOnlyMonitoredTablesDdl) {
+        this.databaseHistoryStoreOnlyMonitoredTablesDdl = databaseHistoryStoreOnlyMonitoredTablesDdl;
+    }
+
+    public boolean isDatabaseHistoryStoreOnlyMonitoredTablesDdl() {
+        return databaseHistoryStoreOnlyMonitoredTablesDdl;
+    }
+
+    /**
      * Maximum size of each batch of source records. Defaults to 2048.
      */
     public void setMaxBatchSize(int maxBatchSize) {
@@ -678,6 +716,21 @@ public class SqlServerConnectorEmbeddedDebeziumConfiguration
     }
 
     /**
+     * Controls the action Debezium will take when it meets a DDL statement in
+     * binlog, that it cannot parse.By default the connector will stop operating
+     * but by changing the setting it can ignore the statements which it cannot
+     * parse. If skipping is enabled then Debezium can miss metadata changes.
+     */
+    public void setDatabaseHistorySkipUnparseableDdl(
+            boolean databaseHistorySkipUnparseableDdl) {
+        this.databaseHistorySkipUnparseableDdl = databaseHistorySkipUnparseableDdl;
+    }
+
+    public boolean isDatabaseHistorySkipUnparseableDdl() {
+        return databaseHistorySkipUnparseableDdl;
+    }
+
+    /**
      * Flag specifying whether built-in tables should be ignored.
      */
     public void setTableIgnoreBuiltin(boolean tableIgnoreBuiltin) {
@@ -824,6 +877,18 @@ public class SqlServerConnectorEmbeddedDebeziumConfiguration
     }
 
     /**
+     * This property can be used to reduce the connector memory usage footprint
+     * when changes are streamed from multiple tables per database.
+     */
+    public void setMaxIterationTransactions(int maxIterationTransactions) {
+        this.maxIterationTransactions = maxIterationTransactions;
+    }
+
+    public int getMaxIterationTransactions() {
+        return maxIterationTransactions;
+    }
+
+    /**
      * Regular expressions matching columns to exclude from change events
      */
     public void setColumnExcludeList(String columnExcludeList) {
@@ -869,6 +934,7 @@ public class SqlServerConnectorEmbeddedDebeziumConfiguration
         addPropertyIfNotNull(configBuilder, "poll.interval.ms", pollIntervalMs);
         addPropertyIfNotNull(configBuilder, "database.history.kafka.recovery.poll.interval.ms", databaseHistoryKafkaRecoveryPollIntervalMs);
         addPropertyIfNotNull(configBuilder, "signal.data.collection", signalDataCollection);
+        addPropertyIfNotNull(configBuilder, "database.history.store.only.captured.tables.ddl", databaseHistoryStoreOnlyCapturedTablesDdl);
         addPropertyIfNotNull(configBuilder, "converters", converters);
         addPropertyIfNotNull(configBuilder, "heartbeat.topics.prefix", heartbeatTopicsPrefix);
         addPropertyIfNotNull(configBuilder, "snapshot.fetch.size", snapshotFetchSize);
@@ -887,6 +953,7 @@ public class SqlServerConnectorEmbeddedDebeziumConfiguration
         addPropertyIfNotNull(configBuilder, "column.propagate.source.type", columnPropagateSourceType);
         addPropertyIfNotNull(configBuilder, "table.exclude.list", tableExcludeList);
         addPropertyIfNotNull(configBuilder, "database.password", databasePassword);
+        addPropertyIfNotNull(configBuilder, "database.history.store.only.monitored.tables.ddl", databaseHistoryStoreOnlyMonitoredTablesDdl);
         addPropertyIfNotNull(configBuilder, "max.batch.size", maxBatchSize);
         addPropertyIfNotNull(configBuilder, "skipped.operations", skippedOperations);
         addPropertyIfNotNull(configBuilder, "snapshot.mode", snapshotMode);
@@ -902,6 +969,7 @@ public class SqlServerConnectorEmbeddedDebeziumConfiguration
         addPropertyIfNotNull(configBuilder, "tombstones.on.delete", tombstonesOnDelete);
         addPropertyIfNotNull(configBuilder, "decimal.handling.mode", decimalHandlingMode);
         addPropertyIfNotNull(configBuilder, "binary.handling.mode", binaryHandlingMode);
+        addPropertyIfNotNull(configBuilder, "database.history.skip.unparseable.ddl", databaseHistorySkipUnparseableDdl);
         addPropertyIfNotNull(configBuilder, "table.ignore.builtin", tableIgnoreBuiltin);
         addPropertyIfNotNull(configBuilder, "snapshot.include.collection.list", snapshotIncludeCollectionList);
         addPropertyIfNotNull(configBuilder, "database.history.file.filename", databaseHistoryFileFilename);
@@ -912,6 +980,7 @@ public class SqlServerConnectorEmbeddedDebeziumConfiguration
         addPropertyIfNotNull(configBuilder, "snapshot.isolation.mode", snapshotIsolationMode);
         addPropertyIfNotNull(configBuilder, "snapshot.max.threads", snapshotMaxThreads);
         addPropertyIfNotNull(configBuilder, "database.port", databasePort);
+        addPropertyIfNotNull(configBuilder, "max.iteration.transactions", maxIterationTransactions);
         addPropertyIfNotNull(configBuilder, "column.exclude.list", columnExcludeList);
         addPropertyIfNotNull(configBuilder, "database.hostname", databaseHostname);
         addPropertyIfNotNull(configBuilder, "table.include.list", tableIncludeList);
diff --git a/components/camel-debezium/camel-debezium-sqlserver/src/generated/resources/org/apache/camel/component/debezium/debezium-sqlserver.json b/components/camel-debezium/camel-debezium-sqlserver/src/generated/resources/org/apache/camel/component/debezium/debezium-sqlserver.json
index 41bf768..dbeac8f 100644
--- a/components/camel-debezium/camel-debezium-sqlserver/src/generated/resources/org/apache/camel/component/debezium/debezium-sqlserver.json
+++ b/components/camel-debezium/camel-debezium-sqlserver/src/generated/resources/org/apache/camel/component/debezium/debezium-sqlserver.json
@@ -50,6 +50,9 @@
     "databaseHistoryKafkaRecoveryAttempts": { "kind": "property", "displayName": "Database History Kafka Recovery Attempts", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 100, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": " [...]
     "databaseHistoryKafkaRecoveryPollIntervalMs": { "kind": "property", "displayName": "Database History Kafka Recovery Poll Interval Ms", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "duration", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "100ms", "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration [...]
     "databaseHistoryKafkaTopic": { "kind": "property", "displayName": "Database History Kafka Topic", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The name of the topic for the da [...]
+    "databaseHistorySkipUnparseableDdl": { "kind": "property", "displayName": "Database History Skip Unparseable Ddl", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": " [...]
+    "databaseHistoryStoreOnlyCapturedTablesDdl": { "kind": "property", "displayName": "Database History Store Only Captured Tables Ddl", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration" [...]
+    "databaseHistoryStoreOnlyMonitoredTablesDdl": { "kind": "property", "displayName": "Database History Store Only Monitored Tables Ddl", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuratio [...]
     "databaseHostname": { "kind": "property", "displayName": "Database Hostname", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Resolvable hostname or IP address of the database se [...]
     "databaseInstance": { "kind": "property", "displayName": "Database Instance", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The SQL Server instance name" },
     "databasePassword": { "kind": "property", "displayName": "Database Password", "group": "sqlserver", "label": "consumer,sqlserver", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Password of the database user  [...]
@@ -64,6 +67,7 @@
     "heartbeatTopicsPrefix": { "kind": "property", "displayName": "Heartbeat Topics Prefix", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "__debezium-heartbeat", "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "T [...]
     "includeSchemaChanges": { "kind": "property", "displayName": "Include Schema Changes", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Whether the connector should  [...]
     "maxBatchSize": { "kind": "property", "displayName": "Max Batch Size", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2048, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of each batch of source records. Def [...]
+    "maxIterationTransactions": { "kind": "property", "displayName": "Max Iteration Transactions", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 0, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "This property can be used to [...]
     "maxQueueSize": { "kind": "property", "displayName": "Max Queue Size", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 8192, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of the queue for change events read  [...]
     "maxQueueSizeInBytes": { "kind": "property", "displayName": "Max Queue Size In Bytes", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 0, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of the queue in bytes  [...]
     "messageKeyColumns": { "kind": "property", "displayName": "Message Key Columns", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A semicolon-separated list of expressions that ma [...]
@@ -81,9 +85,9 @@
     "snapshotLockTimeoutMs": { "kind": "property", "displayName": "Snapshot Lock Timeout Ms", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10s", "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The maximum number of milli [...]
     "snapshotMaxThreads": { "kind": "property", "displayName": "Snapshot Max Threads", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The maximum number of threads used to pe [...]
     "snapshotMode": { "kind": "property", "displayName": "Snapshot Mode", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "initial", "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The criteria for running a snapsh [...]
-    "snapshotSelectStatementOverrides": { "kind": "property", "displayName": "Snapshot Select Statement Overrides", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "This property cont [...]
+    "snapshotSelectStatementOverrides": { "kind": "property", "displayName": "Snapshot Select Statement Overrides", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "This property cont [...]
     "sourceStructVersion": { "kind": "property", "displayName": "Source Struct Version", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "v2", "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A version of the format [...]
-    "sourceTimestampMode": { "kind": "property", "displayName": "Source Timestamp Mode", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "commit", "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Configures the crit [...]
+    "sourceTimestampMode": { "kind": "property", "displayName": "Source Timestamp Mode", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "commit", "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Configures the crit [...]
     "tableBlacklist": { "kind": "property", "displayName": "Table Blacklist", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A comma-separated list of regular expressions that match [...]
     "tableExcludeList": { "kind": "property", "displayName": "Table Exclude List", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A comma-separated list of regular expressions that  [...]
     "tableIgnoreBuiltin": { "kind": "property", "displayName": "Table Ignore Builtin", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Flag specifying whether built-in  [...]
@@ -122,6 +126,9 @@
     "databaseHistoryKafkaRecoveryAttempts": { "kind": "parameter", "displayName": "Database History Kafka Recovery Attempts", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 100, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description":  [...]
     "databaseHistoryKafkaRecoveryPollIntervalMs": { "kind": "parameter", "displayName": "Database History Kafka Recovery Poll Interval Ms", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "duration", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "100ms", "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuratio [...]
     "databaseHistoryKafkaTopic": { "kind": "parameter", "displayName": "Database History Kafka Topic", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The name of the topic for the d [...]
+    "databaseHistorySkipUnparseableDdl": { "kind": "parameter", "displayName": "Database History Skip Unparseable Ddl", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description":  [...]
+    "databaseHistoryStoreOnlyCapturedTablesDdl": { "kind": "parameter", "displayName": "Database History Store Only Captured Tables Ddl", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration [...]
+    "databaseHistoryStoreOnlyMonitoredTablesDdl": { "kind": "parameter", "displayName": "Database History Store Only Monitored Tables Ddl", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configurati [...]
     "databaseHostname": { "kind": "parameter", "displayName": "Database Hostname", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Resolvable hostname or IP address of the database s [...]
     "databaseInstance": { "kind": "parameter", "displayName": "Database Instance", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The SQL Server instance name" },
     "databasePassword": { "kind": "parameter", "displayName": "Database Password", "group": "sqlserver", "label": "consumer,sqlserver", "required": true, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Password of the database user [...]
@@ -136,6 +143,7 @@
     "heartbeatTopicsPrefix": { "kind": "parameter", "displayName": "Heartbeat Topics Prefix", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "__debezium-heartbeat", "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": " [...]
     "includeSchemaChanges": { "kind": "parameter", "displayName": "Include Schema Changes", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Whether the connector should [...]
     "maxBatchSize": { "kind": "parameter", "displayName": "Max Batch Size", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 2048, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of each batch of source records. De [...]
+    "maxIterationTransactions": { "kind": "parameter", "displayName": "Max Iteration Transactions", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 0, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "This property can be used t [...]
     "maxQueueSize": { "kind": "parameter", "displayName": "Max Queue Size", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 8192, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of the queue for change events read [...]
     "maxQueueSizeInBytes": { "kind": "parameter", "displayName": "Max Queue Size In Bytes", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 0, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Maximum size of the queue in bytes [...]
     "messageKeyColumns": { "kind": "parameter", "displayName": "Message Key Columns", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A semicolon-separated list of expressions that m [...]
@@ -153,9 +161,9 @@
     "snapshotLockTimeoutMs": { "kind": "parameter", "displayName": "Snapshot Lock Timeout Ms", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "duration", "javaType": "long", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "10s", "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The maximum number of mill [...]
     "snapshotMaxThreads": { "kind": "parameter", "displayName": "Snapshot Max Threads", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The maximum number of threads used to p [...]
     "snapshotMode": { "kind": "parameter", "displayName": "Snapshot Mode", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "initial", "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "The criteria for running a snaps [...]
-    "snapshotSelectStatementOverrides": { "kind": "parameter", "displayName": "Snapshot Select Statement Overrides", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "This property con [...]
+    "snapshotSelectStatementOverrides": { "kind": "parameter", "displayName": "Snapshot Select Statement Overrides", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "This property con [...]
     "sourceStructVersion": { "kind": "parameter", "displayName": "Source Struct Version", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "v2", "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A version of the forma [...]
-    "sourceTimestampMode": { "kind": "parameter", "displayName": "Source Timestamp Mode", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "commit", "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Configures the cri [...]
+    "sourceTimestampMode": { "kind": "parameter", "displayName": "Source Timestamp Mode", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "commit", "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Configures the cri [...]
     "tableBlacklist": { "kind": "parameter", "displayName": "Table Blacklist", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A comma-separated list of regular expressions that matc [...]
     "tableExcludeList": { "kind": "parameter", "displayName": "Table Exclude List", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "A comma-separated list of regular expressions that [...]
     "tableIgnoreBuiltin": { "kind": "parameter", "displayName": "Table Ignore Builtin", "group": "sqlserver", "label": "consumer,sqlserver", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.debezium.configuration.SqlServerConnectorEmbeddedDebeziumConfiguration", "configurationField": "configuration", "description": "Flag specifying whether built-in [...]
diff --git a/components/camel-debezium/camel-debezium-sqlserver/src/main/docs/debezium-sqlserver-component.adoc b/components/camel-debezium/camel-debezium-sqlserver/src/main/docs/debezium-sqlserver-component.adoc
index 51b14c8..173c349 100644
--- a/components/camel-debezium/camel-debezium-sqlserver/src/main/docs/debezium-sqlserver-component.adoc
+++ b/components/camel-debezium/camel-debezium-sqlserver/src/main/docs/debezium-sqlserver-component.adoc
@@ -46,7 +46,7 @@ debezium-sqlserver:name[?options]
 
 
 // component options: START
-The Debezium SQL Server Connector component supports 69 options, which are listed below.
+The Debezium SQL Server Connector component supports 73 options, which are listed below.
 
 
 
@@ -81,6 +81,9 @@ The Debezium SQL Server Connector component supports 69 options, which are liste
 | *databaseHistoryKafkaRecovery{zwsp}Attempts* (sqlserver) | The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms). | 100 | int
 | *databaseHistoryKafkaRecovery{zwsp}PollIntervalMs* (sqlserver) | The number of milliseconds to wait while polling for persisted data during recovery. | 100ms | int
 | *databaseHistoryKafkaTopic* (sqlserver) | The name of the topic for the database schema history |  | String
+| *databaseHistorySkipUnparseable{zwsp}Ddl* (sqlserver) | Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}CapturedTablesDdl* (sqlserver) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a captured table will be stored. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}MonitoredTablesDdl* (sqlserver) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a monitored table will be stored (deprecated, use database.history.store.only.captured.tables.ddl instead) | false | boolean
 | *databaseHostname* (sqlserver) | Resolvable hostname or IP address of the database server. |  | String
 | *databaseInstance* (sqlserver) | The SQL Server instance name |  | String
 | *databasePassword* (sqlserver) | *Required* Password of the database user to be used when connecting to the database. |  | String
@@ -95,6 +98,7 @@ The Debezium SQL Server Connector component supports 69 options, which are liste
 | *heartbeatTopicsPrefix* (sqlserver) | The prefix that is used to name heartbeat topics.Defaults to __debezium-heartbeat. | __debezium-heartbeat | String
 | *includeSchemaChanges* (sqlserver) | Whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change will be recorded using a key that contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s).The default is 'true'. This is independent of how the connector internally records database history. | true | boolean
 | *maxBatchSize* (sqlserver) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
+| *maxIterationTransactions* (sqlserver) | This property can be used to reduce the connector memory usage footprint when changes are streamed from multiple tables per database. | 0 | int
 | *maxQueueSize* (sqlserver) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
 | *maxQueueSizeInBytes* (sqlserver) | Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to 0. Mean the feature is not enabled | 0 | long
 | *messageKeyColumns* (sqlserver) | A semicolon-separated list of expressions that match fully-qualified tables and column(s) to be used as message key. Each expression must match the pattern ':',where the table names could be defined as (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on the specific connector,and the key columns are a comma-separated list of columns representing the custom key. For any table without an explicit key configuration the table's primary key colum [...]
@@ -112,9 +116,9 @@ The Debezium SQL Server Connector component supports 69 options, which are liste
 | *snapshotLockTimeoutMs* (sqlserver) | The maximum number of millis to wait for table locks at the beginning of a snapshot. If locks cannot be acquired in this time frame, the snapshot will be aborted. Defaults to 10 seconds | 10s | long
 | *snapshotMaxThreads* (sqlserver) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
 | *snapshotMode* (sqlserver) | The criteria for running a snapshot upon startup of the connector. Options include: 'initial' (the default) to specify the connector should run a snapshot only when no offsets are available for the logical server name; 'schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. | initial | String
-| *snapshotSelectStatement{zwsp}Overrides* (sqlserver) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The [...]
+| *snapshotSelectStatement{zwsp}Overrides* (sqlserver) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The  [...]
 | *sourceStructVersion* (sqlserver) | A version of the format of the publicly visible source part in the message | v2 | String
-| *sourceTimestampMode* (sqlserver) | Configures the criteria of the attached timestamp within the source record (ts_ms).Options include:'commit', (default) the source timestamp is set to the instant where the record was committed in the database'processing', the source timestamp is set to the instant where the record was processed by Debezium. | commit | String
+| *sourceTimestampMode* (sqlserver) | Configures the criteria of the attached timestamp within the source record (ts_ms).Options include:'commit', (default) the source timestamp is set to the instant where the record was committed in the database'processing', (deprecated) the source timestamp is set to the instant where the record was processed by Debezium. | commit | String
 | *tableBlacklist* (sqlserver) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
 | *tableExcludeList* (sqlserver) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring |  | String
 | *tableIgnoreBuiltin* (sqlserver) | Flag specifying whether built-in tables should be ignored. | true | boolean
@@ -145,7 +149,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (69 parameters):
+=== Query Parameters (73 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -179,6 +183,9 @@ with the following path and query parameters:
 | *databaseHistoryKafkaRecovery{zwsp}Attempts* (sqlserver) | The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms). | 100 | int
 | *databaseHistoryKafkaRecovery{zwsp}PollIntervalMs* (sqlserver) | The number of milliseconds to wait while polling for persisted data during recovery. | 100ms | int
 | *databaseHistoryKafkaTopic* (sqlserver) | The name of the topic for the database schema history |  | String
+| *databaseHistorySkipUnparseable{zwsp}Ddl* (sqlserver) | Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}CapturedTablesDdl* (sqlserver) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a captured table will be stored. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}MonitoredTablesDdl* (sqlserver) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a monitored table will be stored (deprecated, use database.history.store.only.captured.tables.ddl instead) | false | boolean
 | *databaseHostname* (sqlserver) | Resolvable hostname or IP address of the database server. |  | String
 | *databaseInstance* (sqlserver) | The SQL Server instance name |  | String
 | *databasePassword* (sqlserver) | *Required* Password of the database user to be used when connecting to the database. |  | String
@@ -193,6 +200,7 @@ with the following path and query parameters:
 | *heartbeatTopicsPrefix* (sqlserver) | The prefix that is used to name heartbeat topics.Defaults to __debezium-heartbeat. | __debezium-heartbeat | String
 | *includeSchemaChanges* (sqlserver) | Whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change will be recorded using a key that contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s).The default is 'true'. This is independent of how the connector internally records database history. | true | boolean
 | *maxBatchSize* (sqlserver) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
+| *maxIterationTransactions* (sqlserver) | This property can be used to reduce the connector memory usage footprint when changes are streamed from multiple tables per database. | 0 | int
 | *maxQueueSize* (sqlserver) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
 | *maxQueueSizeInBytes* (sqlserver) | Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to 0. Mean the feature is not enabled | 0 | long
 | *messageKeyColumns* (sqlserver) | A semicolon-separated list of expressions that match fully-qualified tables and column(s) to be used as message key. Each expression must match the pattern ':',where the table names could be defined as (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on the specific connector,and the key columns are a comma-separated list of columns representing the custom key. For any table without an explicit key configuration the table's primary key colum [...]
@@ -210,9 +218,9 @@ with the following path and query parameters:
 | *snapshotLockTimeoutMs* (sqlserver) | The maximum number of millis to wait for table locks at the beginning of a snapshot. If locks cannot be acquired in this time frame, the snapshot will be aborted. Defaults to 10 seconds | 10s | long
 | *snapshotMaxThreads* (sqlserver) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
 | *snapshotMode* (sqlserver) | The criteria for running a snapshot upon startup of the connector. Options include: 'initial' (the default) to specify the connector should run a snapshot only when no offsets are available for the logical server name; 'schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. | initial | String
-| *snapshotSelectStatement{zwsp}Overrides* (sqlserver) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The [...]
+| *snapshotSelectStatement{zwsp}Overrides* (sqlserver) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The  [...]
 | *sourceStructVersion* (sqlserver) | A version of the format of the publicly visible source part in the message | v2 | String
-| *sourceTimestampMode* (sqlserver) | Configures the criteria of the attached timestamp within the source record (ts_ms).Options include:'commit', (default) the source timestamp is set to the instant where the record was committed in the database'processing', the source timestamp is set to the instant where the record was processed by Debezium. | commit | String
+| *sourceTimestampMode* (sqlserver) | Configures the criteria of the attached timestamp within the source record (ts_ms).Options include:'commit', (default) the source timestamp is set to the instant where the record was committed in the database'processing', (deprecated) the source timestamp is set to the instant where the record was processed by Debezium. | commit | String
 | *tableBlacklist* (sqlserver) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
 | *tableExcludeList* (sqlserver) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring |  | String
 | *tableIgnoreBuiltin* (sqlserver) | Flag specifying whether built-in tables should be ignored. | true | boolean
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DebeziumMysqlComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DebeziumMysqlComponentBuilderFactory.java
index 1a91c2c..e71d080 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DebeziumMysqlComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DebeziumMysqlComponentBuilderFactory.java
@@ -612,6 +612,64 @@ public interface DebeziumMysqlComponentBuilderFactory {
             return this;
         }
         /**
+         * Controls the action Debezium will take when it meets a DDL statement
+         * in binlog, that it cannot parse.By default the connector will stop
+         * operating but by changing the setting it can ignore the statements
+         * which it cannot parse. If skipping is enabled then Debezium can miss
+         * metadata changes.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: mysql
+         * 
+         * @param databaseHistorySkipUnparseableDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumMysqlComponentBuilder databaseHistorySkipUnparseableDdl(
+                boolean databaseHistorySkipUnparseableDdl) {
+            doSetProperty("databaseHistorySkipUnparseableDdl", databaseHistorySkipUnparseableDdl);
+            return this;
+        }
+        /**
+         * Controls what DDL will Debezium store in database history. By default
+         * (false) Debezium will store all incoming DDL statements. If set to
+         * true, then only DDL that manipulates a captured table will be stored.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: mysql
+         * 
+         * @param databaseHistoryStoreOnlyCapturedTablesDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumMysqlComponentBuilder databaseHistoryStoreOnlyCapturedTablesDdl(
+                boolean databaseHistoryStoreOnlyCapturedTablesDdl) {
+            doSetProperty("databaseHistoryStoreOnlyCapturedTablesDdl", databaseHistoryStoreOnlyCapturedTablesDdl);
+            return this;
+        }
+        /**
+         * Controls what DDL will Debezium store in database history. By default
+         * (false) Debezium will store all incoming DDL statements. If set to
+         * true, then only DDL that manipulates a monitored table will be stored
+         * (deprecated, use database.history.store.only.captured.tables.ddl
+         * instead).
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: mysql
+         * 
+         * @param databaseHistoryStoreOnlyMonitoredTablesDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumMysqlComponentBuilder databaseHistoryStoreOnlyMonitoredTablesDdl(
+                boolean databaseHistoryStoreOnlyMonitoredTablesDdl) {
+            doSetProperty("databaseHistoryStoreOnlyMonitoredTablesDdl", databaseHistoryStoreOnlyMonitoredTablesDdl);
+            return this;
+        }
+        /**
          * Resolvable hostname or IP address of the database server.
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
@@ -1128,6 +1186,22 @@ public interface DebeziumMysqlComponentBuilderFactory {
             return this;
         }
         /**
+         * The maximum size of chunk for incremental snapshotting.
+         * 
+         * The option is a: &lt;code&gt;int&lt;/code&gt; type.
+         * 
+         * Default: 1024
+         * Group: mysql
+         * 
+         * @param incrementalSnapshotChunkSize the value to set
+         * @return the dsl builder
+         */
+        default DebeziumMysqlComponentBuilder incrementalSnapshotChunkSize(
+                int incrementalSnapshotChunkSize) {
+            doSetProperty("incrementalSnapshotChunkSize", incrementalSnapshotChunkSize);
+            return this;
+        }
+        /**
          * Maximum size of each batch of source records. Defaults to 2048.
          * 
          * The option is a: &lt;code&gt;int&lt;/code&gt; type.
@@ -1486,7 +1560,7 @@ public interface DebeziumMysqlComponentBuilderFactory {
         /**
          * This property contains a comma-separated list of fully-qualified
          * tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on
-         * thespecific connectors . Select statements for the individual tables
+         * thespecific connectors. Select statements for the individual tables
          * are specified in further configuration properties, one for each
          * table, identified by the id
          * 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or
@@ -1704,6 +1778,9 @@ public interface DebeziumMysqlComponentBuilderFactory {
             case "databaseHistoryKafkaRecoveryAttempts": getOrCreateConfiguration((DebeziumMySqlComponent) component).setDatabaseHistoryKafkaRecoveryAttempts((int) value); return true;
             case "databaseHistoryKafkaRecoveryPollIntervalMs": getOrCreateConfiguration((DebeziumMySqlComponent) component).setDatabaseHistoryKafkaRecoveryPollIntervalMs((int) value); return true;
             case "databaseHistoryKafkaTopic": getOrCreateConfiguration((DebeziumMySqlComponent) component).setDatabaseHistoryKafkaTopic((java.lang.String) value); return true;
+            case "databaseHistorySkipUnparseableDdl": getOrCreateConfiguration((DebeziumMySqlComponent) component).setDatabaseHistorySkipUnparseableDdl((boolean) value); return true;
+            case "databaseHistoryStoreOnlyCapturedTablesDdl": getOrCreateConfiguration((DebeziumMySqlComponent) component).setDatabaseHistoryStoreOnlyCapturedTablesDdl((boolean) value); return true;
+            case "databaseHistoryStoreOnlyMonitoredTablesDdl": getOrCreateConfiguration((DebeziumMySqlComponent) component).setDatabaseHistoryStoreOnlyMonitoredTablesDdl((boolean) value); return true;
             case "databaseHostname": getOrCreateConfiguration((DebeziumMySqlComponent) component).setDatabaseHostname((java.lang.String) value); return true;
             case "databaseIncludeList": getOrCreateConfiguration((DebeziumMySqlComponent) component).setDatabaseIncludeList((java.lang.String) value); return true;
             case "databaseInitialStatements": getOrCreateConfiguration((DebeziumMySqlComponent) component).setDatabaseInitialStatements((java.lang.String) value); return true;
@@ -1732,6 +1809,7 @@ public interface DebeziumMysqlComponentBuilderFactory {
             case "includeQuery": getOrCreateConfiguration((DebeziumMySqlComponent) component).setIncludeQuery((boolean) value); return true;
             case "includeSchemaChanges": getOrCreateConfiguration((DebeziumMySqlComponent) component).setIncludeSchemaChanges((boolean) value); return true;
             case "inconsistentSchemaHandlingMode": getOrCreateConfiguration((DebeziumMySqlComponent) component).setInconsistentSchemaHandlingMode((java.lang.String) value); return true;
+            case "incrementalSnapshotChunkSize": getOrCreateConfiguration((DebeziumMySqlComponent) component).setIncrementalSnapshotChunkSize((int) value); return true;
             case "maxBatchSize": getOrCreateConfiguration((DebeziumMySqlComponent) component).setMaxBatchSize((int) value); return true;
             case "maxQueueSize": getOrCreateConfiguration((DebeziumMySqlComponent) component).setMaxQueueSize((int) value); return true;
             case "maxQueueSizeInBytes": getOrCreateConfiguration((DebeziumMySqlComponent) component).setMaxQueueSizeInBytes((long) value); return true;
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DebeziumPostgresComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DebeziumPostgresComponentBuilderFactory.java
index 111cec0..4f34e14 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DebeziumPostgresComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DebeziumPostgresComponentBuilderFactory.java
@@ -819,6 +819,22 @@ public interface DebeziumPostgresComponentBuilderFactory {
             return this;
         }
         /**
+         * The maximum size of chunk for incremental snapshotting.
+         * 
+         * The option is a: &lt;code&gt;int&lt;/code&gt; type.
+         * 
+         * Default: 1024
+         * Group: postgres
+         * 
+         * @param incrementalSnapshotChunkSize the value to set
+         * @return the dsl builder
+         */
+        default DebeziumPostgresComponentBuilder incrementalSnapshotChunkSize(
+                int incrementalSnapshotChunkSize) {
+            doSetProperty("incrementalSnapshotChunkSize", incrementalSnapshotChunkSize);
+            return this;
+        }
+        /**
          * Specify how INTERVAL columns should be represented in change events,
          * including:'string' represents values as an exact ISO formatted
          * string'numeric' (default) represents values using the inexact
@@ -1380,9 +1396,8 @@ public interface DebeziumPostgresComponentBuilderFactory {
          * would normally start emitting changes;'never' to specify the
          * connector should never run a snapshot and that upon first startup the
          * connector should read from the last position (LSN) recorded by the
-         * server; and'exported' to specify the connector should run a snapshot
-         * based on the position when the replication slot was created; 'custom'
-         * to specify a custom class with 'snapshot.custom_class' which will be
+         * server; and'exported' deprecated, use 'initial' instead; 'custom' to
+         * specify a custom class with 'snapshot.custom_class' which will be
          * loaded and used to determine the snapshot, see docs for more details.
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
@@ -1401,7 +1416,7 @@ public interface DebeziumPostgresComponentBuilderFactory {
         /**
          * This property contains a comma-separated list of fully-qualified
          * tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on
-         * thespecific connectors . Select statements for the individual tables
+         * thespecific connectors. Select statements for the individual tables
          * are specified in further configuration properties, one for each
          * table, identified by the id
          * 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or
@@ -1691,6 +1706,7 @@ public interface DebeziumPostgresComponentBuilderFactory {
             case "heartbeatTopicsPrefix": getOrCreateConfiguration((DebeziumPostgresComponent) component).setHeartbeatTopicsPrefix((java.lang.String) value); return true;
             case "hstoreHandlingMode": getOrCreateConfiguration((DebeziumPostgresComponent) component).setHstoreHandlingMode((java.lang.String) value); return true;
             case "includeUnknownDatatypes": getOrCreateConfiguration((DebeziumPostgresComponent) component).setIncludeUnknownDatatypes((boolean) value); return true;
+            case "incrementalSnapshotChunkSize": getOrCreateConfiguration((DebeziumPostgresComponent) component).setIncrementalSnapshotChunkSize((int) value); return true;
             case "intervalHandlingMode": getOrCreateConfiguration((DebeziumPostgresComponent) component).setIntervalHandlingMode((java.lang.String) value); return true;
             case "maxBatchSize": getOrCreateConfiguration((DebeziumPostgresComponent) component).setMaxBatchSize((int) value); return true;
             case "maxQueueSize": getOrCreateConfiguration((DebeziumPostgresComponent) component).setMaxQueueSize((int) value); return true;
diff --git a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DebeziumSqlserverComponentBuilderFactory.java b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DebeziumSqlserverComponentBuilderFactory.java
index dffa961..922063a 100644
--- a/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DebeziumSqlserverComponentBuilderFactory.java
+++ b/core/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DebeziumSqlserverComponentBuilderFactory.java
@@ -537,6 +537,64 @@ public interface DebeziumSqlserverComponentBuilderFactory {
             return this;
         }
         /**
+         * Controls the action Debezium will take when it meets a DDL statement
+         * in binlog, that it cannot parse.By default the connector will stop
+         * operating but by changing the setting it can ignore the statements
+         * which it cannot parse. If skipping is enabled then Debezium can miss
+         * metadata changes.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: sqlserver
+         * 
+         * @param databaseHistorySkipUnparseableDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumSqlserverComponentBuilder databaseHistorySkipUnparseableDdl(
+                boolean databaseHistorySkipUnparseableDdl) {
+            doSetProperty("databaseHistorySkipUnparseableDdl", databaseHistorySkipUnparseableDdl);
+            return this;
+        }
+        /**
+         * Controls what DDL will Debezium store in database history. By default
+         * (false) Debezium will store all incoming DDL statements. If set to
+         * true, then only DDL that manipulates a captured table will be stored.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: sqlserver
+         * 
+         * @param databaseHistoryStoreOnlyCapturedTablesDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumSqlserverComponentBuilder databaseHistoryStoreOnlyCapturedTablesDdl(
+                boolean databaseHistoryStoreOnlyCapturedTablesDdl) {
+            doSetProperty("databaseHistoryStoreOnlyCapturedTablesDdl", databaseHistoryStoreOnlyCapturedTablesDdl);
+            return this;
+        }
+        /**
+         * Controls what DDL will Debezium store in database history. By default
+         * (false) Debezium will store all incoming DDL statements. If set to
+         * true, then only DDL that manipulates a monitored table will be stored
+         * (deprecated, use database.history.store.only.captured.tables.ddl
+         * instead).
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: sqlserver
+         * 
+         * @param databaseHistoryStoreOnlyMonitoredTablesDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumSqlserverComponentBuilder databaseHistoryStoreOnlyMonitoredTablesDdl(
+                boolean databaseHistoryStoreOnlyMonitoredTablesDdl) {
+            doSetProperty("databaseHistoryStoreOnlyMonitoredTablesDdl", databaseHistoryStoreOnlyMonitoredTablesDdl);
+            return this;
+        }
+        /**
          * Resolvable hostname or IP address of the database server.
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
@@ -781,6 +839,24 @@ public interface DebeziumSqlserverComponentBuilderFactory {
             return this;
         }
         /**
+         * This property can be used to reduce the connector memory usage
+         * footprint when changes are streamed from multiple tables per
+         * database.
+         * 
+         * The option is a: &lt;code&gt;int&lt;/code&gt; type.
+         * 
+         * Default: 0
+         * Group: sqlserver
+         * 
+         * @param maxIterationTransactions the value to set
+         * @return the dsl builder
+         */
+        default DebeziumSqlserverComponentBuilder maxIterationTransactions(
+                int maxIterationTransactions) {
+            doSetProperty("maxIterationTransactions", maxIterationTransactions);
+            return this;
+        }
+        /**
          * Maximum size of the queue for change events read from the database
          * log but not yet recorded or forwarded. Defaults to 8192, and should
          * always be larger than the maximum batch size.
@@ -1093,7 +1169,7 @@ public interface DebeziumSqlserverComponentBuilderFactory {
         /**
          * This property contains a comma-separated list of fully-qualified
          * tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on
-         * thespecific connectors . Select statements for the individual tables
+         * thespecific connectors. Select statements for the individual tables
          * are specified in further configuration properties, one for each
          * table, identified by the id
          * 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or
@@ -1137,8 +1213,8 @@ public interface DebeziumSqlserverComponentBuilderFactory {
          * Configures the criteria of the attached timestamp within the source
          * record (ts_ms).Options include:'commit', (default) the source
          * timestamp is set to the instant where the record was committed in the
-         * database'processing', the source timestamp is set to the instant
-         * where the record was processed by Debezium.
+         * database'processing', (deprecated) the source timestamp is set to the
+         * instant where the record was processed by Debezium.
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
          * 
@@ -1328,6 +1404,9 @@ public interface DebeziumSqlserverComponentBuilderFactory {
             case "databaseHistoryKafkaRecoveryAttempts": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setDatabaseHistoryKafkaRecoveryAttempts((int) value); return true;
             case "databaseHistoryKafkaRecoveryPollIntervalMs": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setDatabaseHistoryKafkaRecoveryPollIntervalMs((int) value); return true;
             case "databaseHistoryKafkaTopic": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setDatabaseHistoryKafkaTopic((java.lang.String) value); return true;
+            case "databaseHistorySkipUnparseableDdl": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setDatabaseHistorySkipUnparseableDdl((boolean) value); return true;
+            case "databaseHistoryStoreOnlyCapturedTablesDdl": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setDatabaseHistoryStoreOnlyCapturedTablesDdl((boolean) value); return true;
+            case "databaseHistoryStoreOnlyMonitoredTablesDdl": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setDatabaseHistoryStoreOnlyMonitoredTablesDdl((boolean) value); return true;
             case "databaseHostname": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setDatabaseHostname((java.lang.String) value); return true;
             case "databaseInstance": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setDatabaseInstance((java.lang.String) value); return true;
             case "databasePassword": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setDatabasePassword((java.lang.String) value); return true;
@@ -1342,6 +1421,7 @@ public interface DebeziumSqlserverComponentBuilderFactory {
             case "heartbeatTopicsPrefix": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setHeartbeatTopicsPrefix((java.lang.String) value); return true;
             case "includeSchemaChanges": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setIncludeSchemaChanges((boolean) value); return true;
             case "maxBatchSize": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setMaxBatchSize((int) value); return true;
+            case "maxIterationTransactions": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setMaxIterationTransactions((int) value); return true;
             case "maxQueueSize": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setMaxQueueSize((int) value); return true;
             case "maxQueueSizeInBytes": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setMaxQueueSizeInBytes((long) value); return true;
             case "messageKeyColumns": getOrCreateConfiguration((DebeziumSqlserverComponent) component).setMessageKeyColumns((java.lang.String) value); return true;
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DebeziumMySqlEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DebeziumMySqlEndpointBuilderFactory.java
index 24e2841..794a825 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DebeziumMySqlEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DebeziumMySqlEndpointBuilderFactory.java
@@ -791,6 +791,125 @@ public interface DebeziumMySqlEndpointBuilderFactory {
             return this;
         }
         /**
+         * Controls the action Debezium will take when it meets a DDL statement
+         * in binlog, that it cannot parse.By default the connector will stop
+         * operating but by changing the setting it can ignore the statements
+         * which it cannot parse. If skipping is enabled then Debezium can miss
+         * metadata changes.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: mysql
+         * 
+         * @param databaseHistorySkipUnparseableDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumMySqlEndpointBuilder databaseHistorySkipUnparseableDdl(
+                boolean databaseHistorySkipUnparseableDdl) {
+            doSetProperty("databaseHistorySkipUnparseableDdl", databaseHistorySkipUnparseableDdl);
+            return this;
+        }
+        /**
+         * Controls the action Debezium will take when it meets a DDL statement
+         * in binlog, that it cannot parse.By default the connector will stop
+         * operating but by changing the setting it can ignore the statements
+         * which it cannot parse. If skipping is enabled then Debezium can miss
+         * metadata changes.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: mysql
+         * 
+         * @param databaseHistorySkipUnparseableDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumMySqlEndpointBuilder databaseHistorySkipUnparseableDdl(
+                String databaseHistorySkipUnparseableDdl) {
+            doSetProperty("databaseHistorySkipUnparseableDdl", databaseHistorySkipUnparseableDdl);
+            return this;
+        }
+        /**
+         * Controls what DDL will Debezium store in database history. By default
+         * (false) Debezium will store all incoming DDL statements. If set to
+         * true, then only DDL that manipulates a captured table will be stored.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: mysql
+         * 
+         * @param databaseHistoryStoreOnlyCapturedTablesDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumMySqlEndpointBuilder databaseHistoryStoreOnlyCapturedTablesDdl(
+                boolean databaseHistoryStoreOnlyCapturedTablesDdl) {
+            doSetProperty("databaseHistoryStoreOnlyCapturedTablesDdl", databaseHistoryStoreOnlyCapturedTablesDdl);
+            return this;
+        }
+        /**
+         * Controls what DDL will Debezium store in database history. By default
+         * (false) Debezium will store all incoming DDL statements. If set to
+         * true, then only DDL that manipulates a captured table will be stored.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: mysql
+         * 
+         * @param databaseHistoryStoreOnlyCapturedTablesDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumMySqlEndpointBuilder databaseHistoryStoreOnlyCapturedTablesDdl(
+                String databaseHistoryStoreOnlyCapturedTablesDdl) {
+            doSetProperty("databaseHistoryStoreOnlyCapturedTablesDdl", databaseHistoryStoreOnlyCapturedTablesDdl);
+            return this;
+        }
+        /**
+         * Controls what DDL will Debezium store in database history. By default
+         * (false) Debezium will store all incoming DDL statements. If set to
+         * true, then only DDL that manipulates a monitored table will be stored
+         * (deprecated, use database.history.store.only.captured.tables.ddl
+         * instead).
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: mysql
+         * 
+         * @param databaseHistoryStoreOnlyMonitoredTablesDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumMySqlEndpointBuilder databaseHistoryStoreOnlyMonitoredTablesDdl(
+                boolean databaseHistoryStoreOnlyMonitoredTablesDdl) {
+            doSetProperty("databaseHistoryStoreOnlyMonitoredTablesDdl", databaseHistoryStoreOnlyMonitoredTablesDdl);
+            return this;
+        }
+        /**
+         * Controls what DDL will Debezium store in database history. By default
+         * (false) Debezium will store all incoming DDL statements. If set to
+         * true, then only DDL that manipulates a monitored table will be stored
+         * (deprecated, use database.history.store.only.captured.tables.ddl
+         * instead).
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: mysql
+         * 
+         * @param databaseHistoryStoreOnlyMonitoredTablesDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumMySqlEndpointBuilder databaseHistoryStoreOnlyMonitoredTablesDdl(
+                String databaseHistoryStoreOnlyMonitoredTablesDdl) {
+            doSetProperty("databaseHistoryStoreOnlyMonitoredTablesDdl", databaseHistoryStoreOnlyMonitoredTablesDdl);
+            return this;
+        }
+        /**
          * Resolvable hostname or IP address of the database server.
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
@@ -1463,6 +1582,38 @@ public interface DebeziumMySqlEndpointBuilderFactory {
             return this;
         }
         /**
+         * The maximum size of chunk for incremental snapshotting.
+         * 
+         * The option is a: &lt;code&gt;int&lt;/code&gt; type.
+         * 
+         * Default: 1024
+         * Group: mysql
+         * 
+         * @param incrementalSnapshotChunkSize the value to set
+         * @return the dsl builder
+         */
+        default DebeziumMySqlEndpointBuilder incrementalSnapshotChunkSize(
+                int incrementalSnapshotChunkSize) {
+            doSetProperty("incrementalSnapshotChunkSize", incrementalSnapshotChunkSize);
+            return this;
+        }
+        /**
+         * The maximum size of chunk for incremental snapshotting.
+         * 
+         * The option will be converted to a &lt;code&gt;int&lt;/code&gt; type.
+         * 
+         * Default: 1024
+         * Group: mysql
+         * 
+         * @param incrementalSnapshotChunkSize the value to set
+         * @return the dsl builder
+         */
+        default DebeziumMySqlEndpointBuilder incrementalSnapshotChunkSize(
+                String incrementalSnapshotChunkSize) {
+            doSetProperty("incrementalSnapshotChunkSize", incrementalSnapshotChunkSize);
+            return this;
+        }
+        /**
          * Maximum size of each batch of source records. Defaults to 2048.
          * 
          * The option is a: &lt;code&gt;int&lt;/code&gt; type.
@@ -2023,7 +2174,7 @@ public interface DebeziumMySqlEndpointBuilderFactory {
         /**
          * This property contains a comma-separated list of fully-qualified
          * tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on
-         * thespecific connectors . Select statements for the individual tables
+         * thespecific connectors. Select statements for the individual tables
          * are specified in further configuration properties, one for each
          * table, identified by the id
          * 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DebeziumPostgresEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DebeziumPostgresEndpointBuilderFactory.java
index c70e4fd..a9d50b2 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DebeziumPostgresEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DebeziumPostgresEndpointBuilderFactory.java
@@ -963,6 +963,38 @@ public interface DebeziumPostgresEndpointBuilderFactory {
             return this;
         }
         /**
+         * The maximum size of chunk for incremental snapshotting.
+         * 
+         * The option is a: &lt;code&gt;int&lt;/code&gt; type.
+         * 
+         * Default: 1024
+         * Group: postgres
+         * 
+         * @param incrementalSnapshotChunkSize the value to set
+         * @return the dsl builder
+         */
+        default DebeziumPostgresEndpointBuilder incrementalSnapshotChunkSize(
+                int incrementalSnapshotChunkSize) {
+            doSetProperty("incrementalSnapshotChunkSize", incrementalSnapshotChunkSize);
+            return this;
+        }
+        /**
+         * The maximum size of chunk for incremental snapshotting.
+         * 
+         * The option will be converted to a &lt;code&gt;int&lt;/code&gt; type.
+         * 
+         * Default: 1024
+         * Group: postgres
+         * 
+         * @param incrementalSnapshotChunkSize the value to set
+         * @return the dsl builder
+         */
+        default DebeziumPostgresEndpointBuilder incrementalSnapshotChunkSize(
+                String incrementalSnapshotChunkSize) {
+            doSetProperty("incrementalSnapshotChunkSize", incrementalSnapshotChunkSize);
+            return this;
+        }
+        /**
          * Specify how INTERVAL columns should be represented in change events,
          * including:'string' represents values as an exact ISO formatted
          * string'numeric' (default) represents values using the inexact
@@ -1779,9 +1811,8 @@ public interface DebeziumPostgresEndpointBuilderFactory {
          * would normally start emitting changes;'never' to specify the
          * connector should never run a snapshot and that upon first startup the
          * connector should read from the last position (LSN) recorded by the
-         * server; and'exported' to specify the connector should run a snapshot
-         * based on the position when the replication slot was created; 'custom'
-         * to specify a custom class with 'snapshot.custom_class' which will be
+         * server; and'exported' deprecated, use 'initial' instead; 'custom' to
+         * specify a custom class with 'snapshot.custom_class' which will be
          * loaded and used to determine the snapshot, see docs for more details.
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
@@ -1799,7 +1830,7 @@ public interface DebeziumPostgresEndpointBuilderFactory {
         /**
          * This property contains a comma-separated list of fully-qualified
          * tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on
-         * thespecific connectors . Select statements for the individual tables
+         * thespecific connectors. Select statements for the individual tables
          * are specified in further configuration properties, one for each
          * table, identified by the id
          * 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or
diff --git a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DebeziumSqlserverEndpointBuilderFactory.java b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DebeziumSqlserverEndpointBuilderFactory.java
index 0799bd2..9e95aa9 100644
--- a/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DebeziumSqlserverEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DebeziumSqlserverEndpointBuilderFactory.java
@@ -645,6 +645,125 @@ public interface DebeziumSqlserverEndpointBuilderFactory {
             return this;
         }
         /**
+         * Controls the action Debezium will take when it meets a DDL statement
+         * in binlog, that it cannot parse.By default the connector will stop
+         * operating but by changing the setting it can ignore the statements
+         * which it cannot parse. If skipping is enabled then Debezium can miss
+         * metadata changes.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: sqlserver
+         * 
+         * @param databaseHistorySkipUnparseableDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumSqlserverEndpointBuilder databaseHistorySkipUnparseableDdl(
+                boolean databaseHistorySkipUnparseableDdl) {
+            doSetProperty("databaseHistorySkipUnparseableDdl", databaseHistorySkipUnparseableDdl);
+            return this;
+        }
+        /**
+         * Controls the action Debezium will take when it meets a DDL statement
+         * in binlog, that it cannot parse.By default the connector will stop
+         * operating but by changing the setting it can ignore the statements
+         * which it cannot parse. If skipping is enabled then Debezium can miss
+         * metadata changes.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: sqlserver
+         * 
+         * @param databaseHistorySkipUnparseableDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumSqlserverEndpointBuilder databaseHistorySkipUnparseableDdl(
+                String databaseHistorySkipUnparseableDdl) {
+            doSetProperty("databaseHistorySkipUnparseableDdl", databaseHistorySkipUnparseableDdl);
+            return this;
+        }
+        /**
+         * Controls what DDL will Debezium store in database history. By default
+         * (false) Debezium will store all incoming DDL statements. If set to
+         * true, then only DDL that manipulates a captured table will be stored.
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: sqlserver
+         * 
+         * @param databaseHistoryStoreOnlyCapturedTablesDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumSqlserverEndpointBuilder databaseHistoryStoreOnlyCapturedTablesDdl(
+                boolean databaseHistoryStoreOnlyCapturedTablesDdl) {
+            doSetProperty("databaseHistoryStoreOnlyCapturedTablesDdl", databaseHistoryStoreOnlyCapturedTablesDdl);
+            return this;
+        }
+        /**
+         * Controls what DDL will Debezium store in database history. By default
+         * (false) Debezium will store all incoming DDL statements. If set to
+         * true, then only DDL that manipulates a captured table will be stored.
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: sqlserver
+         * 
+         * @param databaseHistoryStoreOnlyCapturedTablesDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumSqlserverEndpointBuilder databaseHistoryStoreOnlyCapturedTablesDdl(
+                String databaseHistoryStoreOnlyCapturedTablesDdl) {
+            doSetProperty("databaseHistoryStoreOnlyCapturedTablesDdl", databaseHistoryStoreOnlyCapturedTablesDdl);
+            return this;
+        }
+        /**
+         * Controls what DDL will Debezium store in database history. By default
+         * (false) Debezium will store all incoming DDL statements. If set to
+         * true, then only DDL that manipulates a monitored table will be stored
+         * (deprecated, use database.history.store.only.captured.tables.ddl
+         * instead).
+         * 
+         * The option is a: &lt;code&gt;boolean&lt;/code&gt; type.
+         * 
+         * Default: false
+         * Group: sqlserver
+         * 
+         * @param databaseHistoryStoreOnlyMonitoredTablesDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumSqlserverEndpointBuilder databaseHistoryStoreOnlyMonitoredTablesDdl(
+                boolean databaseHistoryStoreOnlyMonitoredTablesDdl) {
+            doSetProperty("databaseHistoryStoreOnlyMonitoredTablesDdl", databaseHistoryStoreOnlyMonitoredTablesDdl);
+            return this;
+        }
+        /**
+         * Controls what DDL will Debezium store in database history. By default
+         * (false) Debezium will store all incoming DDL statements. If set to
+         * true, then only DDL that manipulates a monitored table will be stored
+         * (deprecated, use database.history.store.only.captured.tables.ddl
+         * instead).
+         * 
+         * The option will be converted to a &lt;code&gt;boolean&lt;/code&gt;
+         * type.
+         * 
+         * Default: false
+         * Group: sqlserver
+         * 
+         * @param databaseHistoryStoreOnlyMonitoredTablesDdl the value to set
+         * @return the dsl builder
+         */
+        default DebeziumSqlserverEndpointBuilder databaseHistoryStoreOnlyMonitoredTablesDdl(
+                String databaseHistoryStoreOnlyMonitoredTablesDdl) {
+            doSetProperty("databaseHistoryStoreOnlyMonitoredTablesDdl", databaseHistoryStoreOnlyMonitoredTablesDdl);
+            return this;
+        }
+        /**
          * Resolvable hostname or IP address of the database server.
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
@@ -963,6 +1082,42 @@ public interface DebeziumSqlserverEndpointBuilderFactory {
             return this;
         }
         /**
+         * This property can be used to reduce the connector memory usage
+         * footprint when changes are streamed from multiple tables per
+         * database.
+         * 
+         * The option is a: &lt;code&gt;int&lt;/code&gt; type.
+         * 
+         * Default: 0
+         * Group: sqlserver
+         * 
+         * @param maxIterationTransactions the value to set
+         * @return the dsl builder
+         */
+        default DebeziumSqlserverEndpointBuilder maxIterationTransactions(
+                int maxIterationTransactions) {
+            doSetProperty("maxIterationTransactions", maxIterationTransactions);
+            return this;
+        }
+        /**
+         * This property can be used to reduce the connector memory usage
+         * footprint when changes are streamed from multiple tables per
+         * database.
+         * 
+         * The option will be converted to a &lt;code&gt;int&lt;/code&gt; type.
+         * 
+         * Default: 0
+         * Group: sqlserver
+         * 
+         * @param maxIterationTransactions the value to set
+         * @return the dsl builder
+         */
+        default DebeziumSqlserverEndpointBuilder maxIterationTransactions(
+                String maxIterationTransactions) {
+            doSetProperty("maxIterationTransactions", maxIterationTransactions);
+            return this;
+        }
+        /**
          * Maximum size of the queue for change events read from the database
          * log but not yet recorded or forwarded. Defaults to 8192, and should
          * always be larger than the maximum batch size.
@@ -1464,7 +1619,7 @@ public interface DebeziumSqlserverEndpointBuilderFactory {
         /**
          * This property contains a comma-separated list of fully-qualified
          * tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on
-         * thespecific connectors . Select statements for the individual tables
+         * thespecific connectors. Select statements for the individual tables
          * are specified in further configuration properties, one for each
          * table, identified by the id
          * 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or
@@ -1508,8 +1663,8 @@ public interface DebeziumSqlserverEndpointBuilderFactory {
          * Configures the criteria of the attached timestamp within the source
          * record (ts_ms).Options include:'commit', (default) the source
          * timestamp is set to the instant where the record was committed in the
-         * database'processing', the source timestamp is set to the instant
-         * where the record was processed by Debezium.
+         * database'processing', (deprecated) the source timestamp is set to the
+         * instant where the record was processed by Debezium.
          * 
          * The option is a: &lt;code&gt;java.lang.String&lt;/code&gt; type.
          * 
diff --git a/docs/components/modules/ROOT/pages/debezium-mysql-component.adoc b/docs/components/modules/ROOT/pages/debezium-mysql-component.adoc
index 4a17173..78dde40 100644
--- a/docs/components/modules/ROOT/pages/debezium-mysql-component.adoc
+++ b/docs/components/modules/ROOT/pages/debezium-mysql-component.adoc
@@ -58,7 +58,7 @@ debezium-mysql:name[?options]
 
 
 // component options: START
-The Debezium MySQL Connector component supports 88 options, which are listed below.
+The Debezium MySQL Connector component supports 92 options, which are listed below.
 
 
 
@@ -97,6 +97,9 @@ The Debezium MySQL Connector component supports 88 options, which are listed bel
 | *databaseHistoryKafkaRecovery{zwsp}Attempts* (mysql) | The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms). | 100 | int
 | *databaseHistoryKafkaRecovery{zwsp}PollIntervalMs* (mysql) | The number of milliseconds to wait while polling for persisted data during recovery. | 100ms | int
 | *databaseHistoryKafkaTopic* (mysql) | The name of the topic for the database schema history |  | String
+| *databaseHistorySkipUnparseable{zwsp}Ddl* (mysql) | Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}CapturedTablesDdl* (mysql) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a captured table will be stored. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}MonitoredTablesDdl* (mysql) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a monitored table will be stored (deprecated, use database.history.store.only.captured.tables.ddl instead) | false | boolean
 | *databaseHostname* (mysql) | Resolvable hostname or IP address of the database server. |  | String
 | *databaseIncludeList* (mysql) | The databases for which changes are to be captured |  | String
 | *databaseInitialStatements* (mysql) | A semicolon separated list of SQL statements to be executed when a JDBC connection (not binlog reading connection) to the database is established. Note that the connector may establish JDBC connections at its own discretion, so this should typically be used for configuration of session parameters only,but not for executing DML statements. Use doubled semicolon (';;') to use a semicolon as a character and not as a delimiter. |  | String
@@ -125,6 +128,7 @@ The Debezium MySQL Connector component supports 88 options, which are listed bel
 | *includeQuery* (mysql) | Whether the connector should include the original SQL query that generated the change event. Note: This option requires MySQL be configured with the binlog_rows_query_log_events option set to ON. Query will not be present for events generated from snapshot. WARNING: Enabling this option may expose tables or fields explicitly blacklisted or masked by including the original SQL statement in the change event. For this reason the default value is 'false'. | false | [...]
 | *includeSchemaChanges* (mysql) | Whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change will be recorded using a key that contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s).The default is 'true'. This is independent of how the connector internally records database history. | true | boolean
 | *inconsistentSchemaHandlingMode* (mysql) | Specify how binlog events that belong to a table missing from internal schema representation (i.e. internal representation is not consistent with database) should be handled, including:'fail' (the default) an exception indicating the problematic event and its binlog position is raised, causing the connector to be stopped; 'warn' the problematic event and its binlog position will be logged and the event will be skipped;'skip' the problematic ev [...]
+| *incrementalSnapshotChunkSize* (mysql) | The maximum size of chunk for incremental snapshotting | 1024 | int
 | *maxBatchSize* (mysql) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
 | *maxQueueSize* (mysql) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
 | *maxQueueSizeInBytes* (mysql) | Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to 0. Mean the feature is not enabled | 0 | long
@@ -144,7 +148,7 @@ The Debezium MySQL Connector component supports 88 options, which are listed bel
 | *snapshotMaxThreads* (mysql) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
 | *snapshotMode* (mysql) | The criteria for running a snapshot upon startup of the connector. Options include: 'when_needed' to specify that the connector run a snapshot upon startup whenever it deems it necessary; 'schema_only' to only take a snapshot of the schema (table structures) but no actual data; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector  [...]
 | *snapshotNewTables* (mysql) | BETA FEATURE: On connector restart, the connector will check if there have been any new tables added to the configuration, and snapshot them. There is presently only two options:'off': Default behavior. Do not snapshot new tables.'parallel': The snapshot of the new tables will occur in parallel to the continued binlog reading of the old tables. When the snapshot completes, an independent binlog reader will begin reading the events for the new tables until  [...]
-| *snapshotSelectStatement{zwsp}Overrides* (mysql) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The val [...]
+| *snapshotSelectStatement{zwsp}Overrides* (mysql) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The valu [...]
 | *sourceStructVersion* (mysql) | A version of the format of the publicly visible source part in the message | v2 | String
 | *tableBlacklist* (mysql) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
 | *tableExcludeList* (mysql) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring |  | String
@@ -176,7 +180,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (88 parameters):
+=== Query Parameters (92 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -214,6 +218,9 @@ with the following path and query parameters:
 | *databaseHistoryKafkaRecovery{zwsp}Attempts* (mysql) | The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms). | 100 | int
 | *databaseHistoryKafkaRecovery{zwsp}PollIntervalMs* (mysql) | The number of milliseconds to wait while polling for persisted data during recovery. | 100ms | int
 | *databaseHistoryKafkaTopic* (mysql) | The name of the topic for the database schema history |  | String
+| *databaseHistorySkipUnparseable{zwsp}Ddl* (mysql) | Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}CapturedTablesDdl* (mysql) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a captured table will be stored. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}MonitoredTablesDdl* (mysql) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a monitored table will be stored (deprecated, use database.history.store.only.captured.tables.ddl instead) | false | boolean
 | *databaseHostname* (mysql) | Resolvable hostname or IP address of the database server. |  | String
 | *databaseIncludeList* (mysql) | The databases for which changes are to be captured |  | String
 | *databaseInitialStatements* (mysql) | A semicolon separated list of SQL statements to be executed when a JDBC connection (not binlog reading connection) to the database is established. Note that the connector may establish JDBC connections at its own discretion, so this should typically be used for configuration of session parameters only,but not for executing DML statements. Use doubled semicolon (';;') to use a semicolon as a character and not as a delimiter. |  | String
@@ -242,6 +249,7 @@ with the following path and query parameters:
 | *includeQuery* (mysql) | Whether the connector should include the original SQL query that generated the change event. Note: This option requires MySQL be configured with the binlog_rows_query_log_events option set to ON. Query will not be present for events generated from snapshot. WARNING: Enabling this option may expose tables or fields explicitly blacklisted or masked by including the original SQL statement in the change event. For this reason the default value is 'false'. | false | [...]
 | *includeSchemaChanges* (mysql) | Whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change will be recorded using a key that contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s).The default is 'true'. This is independent of how the connector internally records database history. | true | boolean
 | *inconsistentSchemaHandlingMode* (mysql) | Specify how binlog events that belong to a table missing from internal schema representation (i.e. internal representation is not consistent with database) should be handled, including:'fail' (the default) an exception indicating the problematic event and its binlog position is raised, causing the connector to be stopped; 'warn' the problematic event and its binlog position will be logged and the event will be skipped;'skip' the problematic ev [...]
+| *incrementalSnapshotChunkSize* (mysql) | The maximum size of chunk for incremental snapshotting | 1024 | int
 | *maxBatchSize* (mysql) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
 | *maxQueueSize* (mysql) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
 | *maxQueueSizeInBytes* (mysql) | Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to 0. Mean the feature is not enabled | 0 | long
@@ -261,7 +269,7 @@ with the following path and query parameters:
 | *snapshotMaxThreads* (mysql) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
 | *snapshotMode* (mysql) | The criteria for running a snapshot upon startup of the connector. Options include: 'when_needed' to specify that the connector run a snapshot upon startup whenever it deems it necessary; 'schema_only' to only take a snapshot of the schema (table structures) but no actual data; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector  [...]
 | *snapshotNewTables* (mysql) | BETA FEATURE: On connector restart, the connector will check if there have been any new tables added to the configuration, and snapshot them. There is presently only two options:'off': Default behavior. Do not snapshot new tables.'parallel': The snapshot of the new tables will occur in parallel to the continued binlog reading of the old tables. When the snapshot completes, an independent binlog reader will begin reading the events for the new tables until  [...]
-| *snapshotSelectStatement{zwsp}Overrides* (mysql) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The val [...]
+| *snapshotSelectStatement{zwsp}Overrides* (mysql) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The valu [...]
 | *sourceStructVersion* (mysql) | A version of the format of the publicly visible source part in the message | v2 | String
 | *tableBlacklist* (mysql) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
 | *tableExcludeList* (mysql) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring |  | String
diff --git a/docs/components/modules/ROOT/pages/debezium-postgres-component.adoc b/docs/components/modules/ROOT/pages/debezium-postgres-component.adoc
index 193c7bc..18ba9d2 100644
--- a/docs/components/modules/ROOT/pages/debezium-postgres-component.adoc
+++ b/docs/components/modules/ROOT/pages/debezium-postgres-component.adoc
@@ -49,7 +49,7 @@ debezium-postgres:name[?options]
 
 
 // component options: START
-The Debezium PostgresSQL Connector component supports 88 options, which are listed below.
+The Debezium PostgresSQL Connector component supports 89 options, which are listed below.
 
 
 
@@ -100,6 +100,7 @@ The Debezium PostgresSQL Connector component supports 88 options, which are list
 | *heartbeatTopicsPrefix* (postgres) | The prefix that is used to name heartbeat topics.Defaults to __debezium-heartbeat. | __debezium-heartbeat | String
 | *hstoreHandlingMode* (postgres) | Specify how HSTORE columns should be represented in change events, including:'json' represents values as string-ified JSON (default)'map' represents values as a key/value map | json | String
 | *includeUnknownDatatypes* (postgres) | Specify whether the fields of data type not supported by Debezium should be processed:'false' (the default) omits the fields; 'true' converts the field into an implementation dependent binary representation. | false | boolean
+| *incrementalSnapshotChunkSize* (postgres) | The maximum size of chunk for incremental snapshotting | 1024 | int
 | *intervalHandlingMode* (postgres) | Specify how INTERVAL columns should be represented in change events, including:'string' represents values as an exact ISO formatted string'numeric' (default) represents values using the inexact conversion into microseconds | numeric | String
 | *maxBatchSize* (postgres) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
 | *maxQueueSize* (postgres) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
@@ -131,8 +132,8 @@ The Debezium PostgresSQL Connector component supports 88 options, which are list
 | *snapshotIncludeCollectionList* (postgres) | this setting must be set to specify a list of tables/collections whose snapshot must be taken on creating or restarting the connector. |  | String
 | *snapshotLockTimeoutMs* (postgres) | The maximum number of millis to wait for table locks at the beginning of a snapshot. If locks cannot be acquired in this time frame, the snapshot will be aborted. Defaults to 10 seconds | 10s | long
 | *snapshotMaxThreads* (postgres) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
-| *snapshotMode* (postgres) | The criteria for running a snapshot upon startup of the connector. Options include: 'always' to specify that the connector run a snapshot each time it starts up; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes;'never' to specify t [...]
-| *snapshotSelectStatement{zwsp}Overrides* (postgres) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The  [...]
+| *snapshotMode* (postgres) | The criteria for running a snapshot upon startup of the connector. Options include: 'always' to specify that the connector run a snapshot each time it starts up; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes;'never' to specify t [...]
+| *snapshotSelectStatement{zwsp}Overrides* (postgres) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The v [...]
 | *sourceStructVersion* (postgres) | A version of the format of the publicly visible source part in the message | v2 | String
 | *statusUpdateIntervalMs* (postgres) | Frequency for sending replication connection status updates to the server, given in milliseconds. Defaults to 10 seconds (10,000 ms). | 10s | int
 | *tableBlacklist* (postgres) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
@@ -167,7 +168,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (88 parameters):
+=== Query Parameters (89 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -217,6 +218,7 @@ with the following path and query parameters:
 | *heartbeatTopicsPrefix* (postgres) | The prefix that is used to name heartbeat topics.Defaults to __debezium-heartbeat. | __debezium-heartbeat | String
 | *hstoreHandlingMode* (postgres) | Specify how HSTORE columns should be represented in change events, including:'json' represents values as string-ified JSON (default)'map' represents values as a key/value map | json | String
 | *includeUnknownDatatypes* (postgres) | Specify whether the fields of data type not supported by Debezium should be processed:'false' (the default) omits the fields; 'true' converts the field into an implementation dependent binary representation. | false | boolean
+| *incrementalSnapshotChunkSize* (postgres) | The maximum size of chunk for incremental snapshotting | 1024 | int
 | *intervalHandlingMode* (postgres) | Specify how INTERVAL columns should be represented in change events, including:'string' represents values as an exact ISO formatted string'numeric' (default) represents values using the inexact conversion into microseconds | numeric | String
 | *maxBatchSize* (postgres) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
 | *maxQueueSize* (postgres) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
@@ -248,8 +250,8 @@ with the following path and query parameters:
 | *snapshotIncludeCollectionList* (postgres) | this setting must be set to specify a list of tables/collections whose snapshot must be taken on creating or restarting the connector. |  | String
 | *snapshotLockTimeoutMs* (postgres) | The maximum number of millis to wait for table locks at the beginning of a snapshot. If locks cannot be acquired in this time frame, the snapshot will be aborted. Defaults to 10 seconds | 10s | long
 | *snapshotMaxThreads* (postgres) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
-| *snapshotMode* (postgres) | The criteria for running a snapshot upon startup of the connector. Options include: 'always' to specify that the connector run a snapshot each time it starts up; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes;'never' to specify t [...]
-| *snapshotSelectStatement{zwsp}Overrides* (postgres) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The  [...]
+| *snapshotMode* (postgres) | The criteria for running a snapshot upon startup of the connector. Options include: 'always' to specify that the connector run a snapshot each time it starts up; 'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; 'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes;'never' to specify t [...]
+| *snapshotSelectStatement{zwsp}Overrides* (postgres) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The v [...]
 | *sourceStructVersion* (postgres) | A version of the format of the publicly visible source part in the message | v2 | String
 | *statusUpdateIntervalMs* (postgres) | Frequency for sending replication connection status updates to the server, given in milliseconds. Defaults to 10 seconds (10,000 ms). | 10s | int
 | *tableBlacklist* (postgres) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
diff --git a/docs/components/modules/ROOT/pages/debezium-sqlserver-component.adoc b/docs/components/modules/ROOT/pages/debezium-sqlserver-component.adoc
index 3e355a7..ef3aa4d 100644
--- a/docs/components/modules/ROOT/pages/debezium-sqlserver-component.adoc
+++ b/docs/components/modules/ROOT/pages/debezium-sqlserver-component.adoc
@@ -48,7 +48,7 @@ debezium-sqlserver:name[?options]
 
 
 // component options: START
-The Debezium SQL Server Connector component supports 69 options, which are listed below.
+The Debezium SQL Server Connector component supports 73 options, which are listed below.
 
 
 
@@ -83,6 +83,9 @@ The Debezium SQL Server Connector component supports 69 options, which are liste
 | *databaseHistoryKafkaRecovery{zwsp}Attempts* (sqlserver) | The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms). | 100 | int
 | *databaseHistoryKafkaRecovery{zwsp}PollIntervalMs* (sqlserver) | The number of milliseconds to wait while polling for persisted data during recovery. | 100ms | int
 | *databaseHistoryKafkaTopic* (sqlserver) | The name of the topic for the database schema history |  | String
+| *databaseHistorySkipUnparseable{zwsp}Ddl* (sqlserver) | Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}CapturedTablesDdl* (sqlserver) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a captured table will be stored. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}MonitoredTablesDdl* (sqlserver) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a monitored table will be stored (deprecated, use database.history.store.only.captured.tables.ddl instead) | false | boolean
 | *databaseHostname* (sqlserver) | Resolvable hostname or IP address of the database server. |  | String
 | *databaseInstance* (sqlserver) | The SQL Server instance name |  | String
 | *databasePassword* (sqlserver) | *Required* Password of the database user to be used when connecting to the database. |  | String
@@ -97,6 +100,7 @@ The Debezium SQL Server Connector component supports 69 options, which are liste
 | *heartbeatTopicsPrefix* (sqlserver) | The prefix that is used to name heartbeat topics.Defaults to __debezium-heartbeat. | __debezium-heartbeat | String
 | *includeSchemaChanges* (sqlserver) | Whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change will be recorded using a key that contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s).The default is 'true'. This is independent of how the connector internally records database history. | true | boolean
 | *maxBatchSize* (sqlserver) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
+| *maxIterationTransactions* (sqlserver) | This property can be used to reduce the connector memory usage footprint when changes are streamed from multiple tables per database. | 0 | int
 | *maxQueueSize* (sqlserver) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
 | *maxQueueSizeInBytes* (sqlserver) | Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to 0. Mean the feature is not enabled | 0 | long
 | *messageKeyColumns* (sqlserver) | A semicolon-separated list of expressions that match fully-qualified tables and column(s) to be used as message key. Each expression must match the pattern ':',where the table names could be defined as (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on the specific connector,and the key columns are a comma-separated list of columns representing the custom key. For any table without an explicit key configuration the table's primary key colum [...]
@@ -114,9 +118,9 @@ The Debezium SQL Server Connector component supports 69 options, which are liste
 | *snapshotLockTimeoutMs* (sqlserver) | The maximum number of millis to wait for table locks at the beginning of a snapshot. If locks cannot be acquired in this time frame, the snapshot will be aborted. Defaults to 10 seconds | 10s | long
 | *snapshotMaxThreads* (sqlserver) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
 | *snapshotMode* (sqlserver) | The criteria for running a snapshot upon startup of the connector. Options include: 'initial' (the default) to specify the connector should run a snapshot only when no offsets are available for the logical server name; 'schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. | initial | String
-| *snapshotSelectStatement{zwsp}Overrides* (sqlserver) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The [...]
+| *snapshotSelectStatement{zwsp}Overrides* (sqlserver) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The  [...]
 | *sourceStructVersion* (sqlserver) | A version of the format of the publicly visible source part in the message | v2 | String
-| *sourceTimestampMode* (sqlserver) | Configures the criteria of the attached timestamp within the source record (ts_ms).Options include:'commit', (default) the source timestamp is set to the instant where the record was committed in the database'processing', the source timestamp is set to the instant where the record was processed by Debezium. | commit | String
+| *sourceTimestampMode* (sqlserver) | Configures the criteria of the attached timestamp within the source record (ts_ms).Options include:'commit', (default) the source timestamp is set to the instant where the record was committed in the database'processing', (deprecated) the source timestamp is set to the instant where the record was processed by Debezium. | commit | String
 | *tableBlacklist* (sqlserver) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
 | *tableExcludeList* (sqlserver) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring |  | String
 | *tableIgnoreBuiltin* (sqlserver) | Flag specifying whether built-in tables should be ignored. | true | boolean
@@ -147,7 +151,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (69 parameters):
+=== Query Parameters (73 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -181,6 +185,9 @@ with the following path and query parameters:
 | *databaseHistoryKafkaRecovery{zwsp}Attempts* (sqlserver) | The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms). | 100 | int
 | *databaseHistoryKafkaRecovery{zwsp}PollIntervalMs* (sqlserver) | The number of milliseconds to wait while polling for persisted data during recovery. | 100ms | int
 | *databaseHistoryKafkaTopic* (sqlserver) | The name of the topic for the database schema history |  | String
+| *databaseHistorySkipUnparseable{zwsp}Ddl* (sqlserver) | Controls the action Debezium will take when it meets a DDL statement in binlog, that it cannot parse.By default the connector will stop operating but by changing the setting it can ignore the statements which it cannot parse. If skipping is enabled then Debezium can miss metadata changes. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}CapturedTablesDdl* (sqlserver) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a captured table will be stored. | false | boolean
+| *databaseHistoryStoreOnly{zwsp}MonitoredTablesDdl* (sqlserver) | Controls what DDL will Debezium store in database history. By default (false) Debezium will store all incoming DDL statements. If set to true, then only DDL that manipulates a monitored table will be stored (deprecated, use database.history.store.only.captured.tables.ddl instead) | false | boolean
 | *databaseHostname* (sqlserver) | Resolvable hostname or IP address of the database server. |  | String
 | *databaseInstance* (sqlserver) | The SQL Server instance name |  | String
 | *databasePassword* (sqlserver) | *Required* Password of the database user to be used when connecting to the database. |  | String
@@ -195,6 +202,7 @@ with the following path and query parameters:
 | *heartbeatTopicsPrefix* (sqlserver) | The prefix that is used to name heartbeat topics.Defaults to __debezium-heartbeat. | __debezium-heartbeat | String
 | *includeSchemaChanges* (sqlserver) | Whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change will be recorded using a key that contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s).The default is 'true'. This is independent of how the connector internally records database history. | true | boolean
 | *maxBatchSize* (sqlserver) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
+| *maxIterationTransactions* (sqlserver) | This property can be used to reduce the connector memory usage footprint when changes are streamed from multiple tables per database. | 0 | int
 | *maxQueueSize* (sqlserver) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
 | *maxQueueSizeInBytes* (sqlserver) | Maximum size of the queue in bytes for change events read from the database log but not yet recorded or forwarded. Defaults to 0. Mean the feature is not enabled | 0 | long
 | *messageKeyColumns* (sqlserver) | A semicolon-separated list of expressions that match fully-qualified tables and column(s) to be used as message key. Each expression must match the pattern ':',where the table names could be defined as (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on the specific connector,and the key columns are a comma-separated list of columns representing the custom key. For any table without an explicit key configuration the table's primary key colum [...]
@@ -212,9 +220,9 @@ with the following path and query parameters:
 | *snapshotLockTimeoutMs* (sqlserver) | The maximum number of millis to wait for table locks at the beginning of a snapshot. If locks cannot be acquired in this time frame, the snapshot will be aborted. Defaults to 10 seconds | 10s | long
 | *snapshotMaxThreads* (sqlserver) | The maximum number of threads used to perform the snapshot. Defaults to 1. | 1 | int
 | *snapshotMode* (sqlserver) | The criteria for running a snapshot upon startup of the connector. Options include: 'initial' (the default) to specify the connector should run a snapshot only when no offsets are available for the logical server name; 'schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. | initial | String
-| *snapshotSelectStatement{zwsp}Overrides* (sqlserver) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The [...]
+| *snapshotSelectStatement{zwsp}Overrides* (sqlserver) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors. Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The  [...]
 | *sourceStructVersion* (sqlserver) | A version of the format of the publicly visible source part in the message | v2 | String
-| *sourceTimestampMode* (sqlserver) | Configures the criteria of the attached timestamp within the source record (ts_ms).Options include:'commit', (default) the source timestamp is set to the instant where the record was committed in the database'processing', the source timestamp is set to the instant where the record was processed by Debezium. | commit | String
+| *sourceTimestampMode* (sqlserver) | Configures the criteria of the attached timestamp within the source record (ts_ms).Options include:'commit', (default) the source timestamp is set to the instant where the record was committed in the database'processing', (deprecated) the source timestamp is set to the instant where the record was processed by Debezium. | commit | String
 | *tableBlacklist* (sqlserver) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring (deprecated, use table.exclude.list instead) |  | String
 | *tableExcludeList* (sqlserver) | A comma-separated list of regular expressions that match the fully-qualified names of tables to be excluded from monitoring |  | String
 | *tableIgnoreBuiltin* (sqlserver) | Flag specifying whether built-in tables should be ignored. | true | boolean
diff --git a/parent/pom.xml b/parent/pom.xml
index 53a4bf9..32c5eae 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -155,8 +155,8 @@
         <docker-java-version>3.2.10</docker-java-version>
         <dozer-version>6.5.2</dozer-version>
         <dropbox-version>3.2.0</dropbox-version>
-        <debezium-version>1.5.2.Final</debezium-version>
-        <debezium-mysql-connector-version>8.0.22</debezium-mysql-connector-version>
+        <debezium-version>1.6.0.Final</debezium-version>
+        <debezium-mysql-connector-version>8.0.25</debezium-mysql-connector-version>
         <eddsa-version>0.3.0</eddsa-version>
         <egit-github-core-version>2.1.5</egit-github-core-version>
         <ehcache3-version>3.9.4</ehcache3-version>