You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/08/24 14:58:48 UTC

[camel] branch main updated: CAMEL-16882 Add ddl sql output in header (#5980)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 2029cbe  CAMEL-16882 Add ddl sql output in header (#5980)
2029cbe is described below

commit 2029cbe05cbf02b3d0027320e7bc87884f4a80c4
Author: Winger <wi...@gmail.com>
AuthorDate: Tue Aug 24 22:58:23 2021 +0800

    CAMEL-16882 Add ddl sql output in header (#5980)
    
    * CAMEL-16882 Add ddl sql output in header
    
    * CAMEL-16882 test class fmt
    
    * fix header key with DdlSQL
---
 .../component/debezium/DebeziumConstants.java      |  1 +
 .../camel/component/debezium/DebeziumEndpoint.java |  4 +++-
 .../component/debezium/DebeziumEndpointTest.java   | 25 ++++++++++++++++++++++
 3 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java
index 718866c..eefcebb 100644
--- a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java
+++ b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java
@@ -34,6 +34,7 @@ public final class DebeziumConstants {
     public static final String HEADER_OPERATION = HEADER_PREFIX + "Operation";
     public static final String HEADER_TIMESTAMP = HEADER_PREFIX + "Timestamp";
     public static final String HEADER_BEFORE = HEADER_PREFIX + "Before";
+    public static final String HEADER_DDL_SQL = HEADER_PREFIX + "DdlSQL";
 
     private DebeziumConstants() {
     }
diff --git a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
index 19f1c4e..f9cfc4c 100644
--- a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
+++ b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
@@ -20,6 +20,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
 import io.debezium.data.Envelope;
+import io.debezium.relational.history.HistoryRecord;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -82,7 +83,7 @@ public abstract class DebeziumEndpoint<C extends EmbeddedDebeziumConfiguration>
         final Object before = extractValueFromValueStruct(valueSchema, value, Envelope.FieldName.BEFORE);
         final Object body = extractBodyValueFromValueStruct(valueSchema, value);
         final Object timestamp = extractValueFromValueStruct(valueSchema, value, Envelope.FieldName.TIMESTAMP);
-
+        final Object ddl = extractValueFromValueStruct(valueSchema, value, HistoryRecord.Fields.DDL_STATEMENTS);
         // set message headers
         message.setHeader(DebeziumConstants.HEADER_IDENTIFIER, record.topic());
         message.setHeader(DebeziumConstants.HEADER_KEY, record.key());
@@ -90,6 +91,7 @@ public abstract class DebeziumEndpoint<C extends EmbeddedDebeziumConfiguration>
         message.setHeader(DebeziumConstants.HEADER_OPERATION, operation);
         message.setHeader(DebeziumConstants.HEADER_BEFORE, before);
         message.setHeader(DebeziumConstants.HEADER_TIMESTAMP, timestamp);
+        message.setHeader(DebeziumConstants.HEADER_DDL_SQL, ddl);
         message.setHeader(Exchange.MESSAGE_TIMESTAMP, timestamp);
 
         message.setBody(body);
diff --git a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
index 4c9c4ce..1a69941 100644
--- a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
+++ b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
@@ -199,6 +199,20 @@ public class DebeziumEndpointTest {
         assertNull(body);
     }
 
+    @Test
+    void testIfCreatesExchangeFromSourceDdlRecord() {
+        final SourceRecord sourceRecord = createDdlSQLRecord();
+
+        final Exchange exchange = debeziumEndpoint.createDbzExchange(null, sourceRecord);
+        final Message inMessage = exchange.getIn();
+
+        assertNotNull(exchange);
+        // assert headers
+        assertEquals("dummy", inMessage.getHeader(DebeziumConstants.HEADER_IDENTIFIER));
+        assertEquals("SET character_set_server=utf8, collation_server=utf8_bin",
+                inMessage.getHeader(DebeziumConstants.HEADER_DDL_SQL));
+    }
+
     private SourceRecord createCreateRecord() {
         final Schema recordSchema = SchemaBuilder.struct().field("id", SchemaBuilder.int8()).build();
         final Schema sourceSchema = SchemaBuilder.struct().field("lsn", SchemaBuilder.int32()).build();
@@ -256,6 +270,17 @@ public class DebeziumEndpointTest {
                 createKeyRecord(), envelope.schema(), payload);
     }
 
+    private SourceRecord createDdlSQLRecord() {
+        final Schema recordSchema = SchemaBuilder.struct().field("ddl", SchemaBuilder.string()).build();
+        Envelope.defineSchema().withName("dummy.Envelope").withRecord(recordSchema).withSource(SchemaBuilder.struct().build())
+                .build();
+        final Struct recordValue = new Struct(recordSchema);
+        recordValue.put("ddl", "SET character_set_server=utf8, collation_server=utf8_bin");
+        return new SourceRecord(
+                new HashMap<>(), createSourceOffset(), "dummy", null,
+                null, recordValue.schema(), recordValue);
+    }
+
     private SourceRecord createUnknownUnnamedSchemaRecord() {
         final Schema recordSchema = SchemaBuilder.struct().field("id", SchemaBuilder.int8()).build();
         final Struct before = new Struct(recordSchema);