You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by oa...@apache.org on 2020/01/16 11:58:12 UTC

[camel] branch master updated: CAMEL-14188: Upgrade Debezium to 1.0

This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new faa3182  CAMEL-14188: Upgrade Debezium to 1.0
     new d60dfcb  Merge pull request #3487 from omarsmak/CAMEL-14188
faa3182 is described below

commit faa31827ad8e347f49649571f3cb4ce154a85cc5
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Thu Jan 16 12:50:20 2020 +0100

    CAMEL-14188: Upgrade Debezium to 1.0
---
 .../component/debezium/DebeziumEndpointTest.java   |  7 +--
 .../src/main/docs/debezium-postgres-component.adoc |  3 +-
 components/camel-debezium-sqlserver/pom.xml        |  4 +-
 .../main/docs/debezium-sqlserver-component.adoc    |  6 ++-
 .../DebeziumPostgresEndpointBuilderFactory.java    | 16 +++++++
 .../DebeziumSqlserverEndpointBuilderFactory.java   | 54 ++++++++++++++++++++--
 parent/pom.xml                                     |  4 +-
 7 files changed, 80 insertions(+), 14 deletions(-)

diff --git a/components/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java b/components/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
index 19b5613..05d71a3 100644
--- a/components/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
+++ b/components/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.debezium;
 
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -205,7 +206,7 @@ public class DebeziumEndpointTest {
 
         after.put("id", (byte)1);
         source.put("lsn", 1234);
-        final Struct payload = envelope.create(after, source, System.nanoTime());
+        final Struct payload = envelope.create(after, source, Instant.now());
         return new SourceRecord(new HashMap<>(), createSourceOffset(), "dummy", createKeySchema(),
                 createKeyRecord(), envelope.schema(), payload);
     }
@@ -216,7 +217,7 @@ public class DebeziumEndpointTest {
                 .withSource(SchemaBuilder.struct().build()).build();
         final Struct before = new Struct(recordSchema);
         before.put("id", (byte)1);
-        final Struct payload = envelope.delete(before, null, System.nanoTime());
+        final Struct payload = envelope.delete(before, null, Instant.now());
         return new SourceRecord(new HashMap<>(), createSourceOffset(), "dummy", createKeySchema(),
                 createKeyRecord(), envelope.schema(), payload);
     }
@@ -243,7 +244,7 @@ public class DebeziumEndpointTest {
         before.put("id", (byte)1);
         after.put("id", (byte)2);
         source.put("lsn", 1234);
-        final Struct payload = envelope.update(before, after, source, System.nanoTime());
+        final Struct payload = envelope.update(before, after, source, Instant.now());
         return new SourceRecord(new HashMap<>(), createSourceOffset(), "dummy", createKeySchema(),
                 createKeyRecord(), envelope.schema(), payload);
     }
diff --git a/components/camel-debezium-postgres/src/main/docs/debezium-postgres-component.adoc b/components/camel-debezium-postgres/src/main/docs/debezium-postgres-component.adoc
index 6a0c357..1f84788 100644
--- a/components/camel-debezium-postgres/src/main/docs/debezium-postgres-component.adoc
+++ b/components/camel-debezium-postgres/src/main/docs/debezium-postgres-component.adoc
@@ -73,7 +73,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (64 parameters):
+=== Query Parameters (65 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -115,6 +115,7 @@ with the following path and query parameters:
 | *heartbeatTopicsPrefix* (postgres) | The prefix that is used to name heartbeat topics.Defaults to __debezium-heartbeat. | __debezium-heartbeat | String
 | *hstoreHandlingMode* (postgres) | Specify how HSTORE columns should be represented in change events, including:'json' represents values as json string'map' (default) represents values using java.util.Map | json | String
 | *includeUnknownDatatypes* (postgres) | Specify whether the fields of data type not supported by Debezium should be processed:'false' (the default) omits the fields; 'true' converts the field into an implementation dependent binary representation. | false | boolean
+| *intervalHandlingMode* (postgres) | Specify how INTERVAL columns should be represented in change events, including:'string' represents values as an exact ISO formatted string'numeric' (default) represents values using the inexact conversion into microseconds | numeric | String
 | *maxBatchSize* (postgres) | Maximum size of each batch of source records. Defaults to 2048. | 2048 | int
 | *maxQueueSize* (postgres) | Maximum size of the queue for change events read from the database log but not yet recorded or forwarded. Defaults to 8192, and should always be larger than the maximum batch size. | 8192 | int
 | *messageKeyColumns* (postgres) | A semicolon-separated list of expressions that match fully-qualified tables and column(s) to be used as message key. Each expression must match the pattern ':',where the table names could be defined as (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on the specific connector,and the key columns are a comma-separated list of columns representing the custom key. For any table without an explicit key configuration the table's primary key column [...]
diff --git a/components/camel-debezium-sqlserver/pom.xml b/components/camel-debezium-sqlserver/pom.xml
index 89a1700..8354b97 100644
--- a/components/camel-debezium-sqlserver/pom.xml
+++ b/components/camel-debezium-sqlserver/pom.xml
@@ -91,9 +91,7 @@
                     <dependency>
                         <groupId>io.debezium</groupId>
                         <artifactId>debezium-connector-sqlserver</artifactId>
-                        <!-- Only for the maven plugin, we will use 1.0.0.Beta2 to generate the config due to missing configs on 0.10 -->
-                        <!-- Once we have 1.0.0.Final, we will change this back to ${debezium-version} -->
-                        <version>1.0.0.Beta2</version>
+                        <version>${debezium-version}</version>
                         <scope>runtime</scope>
                     </dependency>
                 </dependencies>
diff --git a/components/camel-debezium-sqlserver/src/main/docs/debezium-sqlserver-component.adoc b/components/camel-debezium-sqlserver/src/main/docs/debezium-sqlserver-component.adoc
index 5411351..2188cd6 100644
--- a/components/camel-debezium-sqlserver/src/main/docs/debezium-sqlserver-component.adoc
+++ b/components/camel-debezium-sqlserver/src/main/docs/debezium-sqlserver-component.adoc
@@ -72,7 +72,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (45 parameters):
+=== Query Parameters (47 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -105,6 +105,7 @@ with the following path and query parameters:
 | *databasePassword* (sqlserver) | *Required* Password of the SQL Server database user to be used when connecting to the database. |  | String
 | *databasePort* (sqlserver) | Port of the SQL Server database server. | 1433 | int
 | *databaseServerName* (sqlserver) | *Required* Unique name that identifies the database server and all recorded offsets, and that is used as a prefix for all schemas and topics. Each distinct installation should have a separate namespace and be monitored by at most one Debezium connector. |  | String
+| *databaseServerTimezone* (sqlserver) | The timezone of the server used to correctly shift the commit transaction timestamp on the client sideOptions include: Any valid Java ZoneId |  | String
 | *databaseUser* (sqlserver) | Name of the SQL Server database user to be used when connecting to the database. |  | String
 | *decimalHandlingMode* (sqlserver) | Specify how DECIMAL and NUMERIC columns should be represented in change events, including:'precise' (the default) uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connect's 'org.apache.kafka.connect.data.Decimal' type; 'string' uses string to represent values; 'double' represents values using Java's 'double', which may not offer the precision but will be far easier to use in [...]
 | *heartbeatIntervalMs* (sqlserver) | Length of an interval in milli-seconds in in which the connector periodically sends heartbeat messages to a heartbeat topic. Use 0 to disable heartbeat messages. Disabled by default. | 0 | int
@@ -116,13 +117,14 @@ with the following path and query parameters:
 | *snapshotDelayMs* (sqlserver) | The number of milliseconds to delay before a snapshot will begin. | 0 | long
 | *snapshotFetchSize* (sqlserver) | The maximum number of records that should be loaded into memory while performing a snapshot |  | int
 | *snapshotLockTimeoutMs* (sqlserver) | The maximum number of millis to wait for table locks at the beginning of a snapshot. If locks cannot be acquired in this time frame, the snapshot will be aborted. Defaults to 10 seconds | 10000 | long
-| *snapshotMode* (sqlserver) | The criteria for running a snapshot upon startup of the connector. Options include: 'initial' (the default) to specify the connector should run a snapshot only when no offsets are available for the logical server name; 'initial_schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. | initial | String
+| *snapshotMode* (sqlserver) | The criteria for running a snapshot upon startup of the connector. Options include: 'initial' (the default) to specify the connector should run a snapshot only when no offsets are available for the logical server name; 'schema_only' to specify the connector should run a snapshot of the schema when no offsets are available for the logical server name. | initial | String
 | *snapshotSelectStatement Overrides* (sqlserver) | This property contains a comma-separated list of fully-qualified tables (DB_NAME.TABLE_NAME) or (SCHEMA_NAME.TABLE_NAME), depending on thespecific connectors . Select statements for the individual tables are specified in further configuration properties, one for each table, identified by the id 'snapshot.select.statement.overrides.DB_NAME.TABLE_NAME' or 'snapshot.select.statement.overrides.SCHEMA_NAME.TABLE_NAME', respectively. The valu [...]
 | *sourceStructVersion* (sqlserver) | A version of the format of the publicly visible source part in the message | v2 | String
 | *tableBlacklist* (sqlserver) | Description is not available here, please check Debezium website for corresponding key 'table.blacklist' description. |  | String
 | *tableIgnoreBuiltin* (sqlserver) | Flag specifying whether built-in tables should be ignored. | true | boolean
 | *tableWhitelist* (sqlserver) | The tables for which changes are to be captured |  | String
 | *timePrecisionMode* (sqlserver) | Time, date, and timestamps can be represented with different kinds of precisions, including:'adaptive' (the default) bases the precision of time, date, and timestamp values on the database column's precision; 'adaptive_time_microseconds' like 'adaptive' mode, but TIME fields always use microseconds precision;'connect' always represents time, date, and timestamp values using Kafka Connect's built-in representations for Time, Date, and Timestamp, which u [...]
+| *tombstonesOnDelete* (sqlserver) | Whether delete operations should be represented by a delete event and a subsquenttombstone event (true) or only by a delete event (false). Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted. | false | boolean
 |===
 // endpoint options: END
 // spring-boot-auto-configure options: START
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/DebeziumPostgresEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/DebeziumPostgresEndpointBuilderFactory.java
index 936daf2..fa277ce 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/DebeziumPostgresEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/DebeziumPostgresEndpointBuilderFactory.java
@@ -637,6 +637,22 @@ public interface DebeziumPostgresEndpointBuilderFactory {
             return this;
         }
         /**
+         * Specify how INTERVAL columns should be represented in change events,
+         * including:'string' represents values as an exact ISO formatted
+         * string'numeric' (default) represents values using the inexact
+         * conversion into microseconds.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Default: numeric
+         * Group: postgres
+         */
+        default DebeziumPostgresEndpointBuilder intervalHandlingMode(
+                String intervalHandlingMode) {
+            doSetProperty("intervalHandlingMode", intervalHandlingMode);
+            return this;
+        }
+        /**
          * Maximum size of each batch of source records. Defaults to 2048.
          * 
          * The option is a: <code>int</code> type.
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/DebeziumSqlserverEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/DebeziumSqlserverEndpointBuilderFactory.java
index 07172b3..501e549 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/DebeziumSqlserverEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/DebeziumSqlserverEndpointBuilderFactory.java
@@ -487,6 +487,20 @@ public interface DebeziumSqlserverEndpointBuilderFactory {
             return this;
         }
         /**
+         * The timezone of the server used to correctly shift the commit
+         * transaction timestamp on the client sideOptions include: Any valid
+         * Java ZoneId.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: sqlserver
+         */
+        default DebeziumSqlserverEndpointBuilder databaseServerTimezone(
+                String databaseServerTimezone) {
+            doSetProperty("databaseServerTimezone", databaseServerTimezone);
+            return this;
+        }
+        /**
          * Name of the SQL Server database user to be used when connecting to
          * the database.
          * 
@@ -751,9 +765,9 @@ public interface DebeziumSqlserverEndpointBuilderFactory {
          * The criteria for running a snapshot upon startup of the connector.
          * Options include: 'initial' (the default) to specify the connector
          * should run a snapshot only when no offsets are available for the
-         * logical server name; 'initial_schema_only' to specify the connector
-         * should run a snapshot of the schema when no offsets are available for
-         * the logical server name.
+         * logical server name; 'schema_only' to specify the connector should
+         * run a snapshot of the schema when no offsets are available for the
+         * logical server name.
          * 
          * The option is a: <code>java.lang.String</code> type.
          * 
@@ -873,6 +887,40 @@ public interface DebeziumSqlserverEndpointBuilderFactory {
             doSetProperty("timePrecisionMode", timePrecisionMode);
             return this;
         }
+        /**
+         * Whether delete operations should be represented by a delete event and
+         * a subsquenttombstone event (true) or only by a delete event (false).
+         * Emitting the tombstone event (the default behavior) allows Kafka to
+         * completely delete all events pertaining to the given key once the
+         * source record got deleted.
+         * 
+         * The option is a: <code>boolean</code> type.
+         * 
+         * Default: false
+         * Group: sqlserver
+         */
+        default DebeziumSqlserverEndpointBuilder tombstonesOnDelete(
+                boolean tombstonesOnDelete) {
+            doSetProperty("tombstonesOnDelete", tombstonesOnDelete);
+            return this;
+        }
+        /**
+         * Whether delete operations should be represented by a delete event and
+         * a subsquenttombstone event (true) or only by a delete event (false).
+         * Emitting the tombstone event (the default behavior) allows Kafka to
+         * completely delete all events pertaining to the given key once the
+         * source record got deleted.
+         * 
+         * The option will be converted to a <code>boolean</code> type.
+         * 
+         * Default: false
+         * Group: sqlserver
+         */
+        default DebeziumSqlserverEndpointBuilder tombstonesOnDelete(
+                String tombstonesOnDelete) {
+            doSetProperty("tombstonesOnDelete", tombstonesOnDelete);
+            return this;
+        }
     }
 
     /**
diff --git a/parent/pom.xml b/parent/pom.xml
index e24b8fc..0492020 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -172,7 +172,7 @@
         <dozer-version>6.5.0</dozer-version>
         <drools-version>7.31.0.Final</drools-version>
         <dropbox-version>3.1.3</dropbox-version>
-        <debezium-version>0.10.0.Final</debezium-version>
+        <debezium-version>1.0.0.Final</debezium-version>
         <debezium-mysql-connector-version>8.0.15</debezium-mysql-connector-version>
         <egit-github-core-version>2.1.5</egit-github-core-version>
         <egit-github-core-bundle-version>2.1.5_1</egit-github-core-bundle-version>
@@ -3770,7 +3770,7 @@
               <artifactId>pulsar-client-admin</artifactId>
               <version>${pulsar-version}</version>
             </dependency>
-            
+
             <!-- apicurio -->
             <dependency>
                 <groupId>io.apicurio</groupId>