You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/08/24 14:58:48 UTC
[camel] branch main updated: CAMEL-16882 Add ddl sql output in
header (#5980)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 2029cbe CAMEL-16882 Add ddl sql output in header (#5980)
2029cbe is described below
commit 2029cbe05cbf02b3d0027320e7bc87884f4a80c4
Author: Winger <wi...@gmail.com>
AuthorDate: Tue Aug 24 22:58:23 2021 +0800
CAMEL-16882 Add ddl sql output in header (#5980)
* CAMEL-16882 Add ddl sql output in header
* CAMEL-16882 test class fmt
* fix header key with DdlSQL
---
.../component/debezium/DebeziumConstants.java | 1 +
.../camel/component/debezium/DebeziumEndpoint.java | 4 +++-
.../component/debezium/DebeziumEndpointTest.java | 25 ++++++++++++++++++++++
3 files changed, 29 insertions(+), 1 deletion(-)
diff --git a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java
index 718866c..eefcebb 100644
--- a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java
+++ b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java
@@ -34,6 +34,7 @@ public final class DebeziumConstants {
public static final String HEADER_OPERATION = HEADER_PREFIX + "Operation";
public static final String HEADER_TIMESTAMP = HEADER_PREFIX + "Timestamp";
public static final String HEADER_BEFORE = HEADER_PREFIX + "Before";
+ public static final String HEADER_DDL_SQL = HEADER_PREFIX + "DdlSQL";
private DebeziumConstants() {
}
diff --git a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
index 19f1c4e..f9cfc4c 100644
--- a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
+++ b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
@@ -20,6 +20,7 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import io.debezium.data.Envelope;
+import io.debezium.relational.history.HistoryRecord;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -82,7 +83,7 @@ public abstract class DebeziumEndpoint<C extends EmbeddedDebeziumConfiguration>
final Object before = extractValueFromValueStruct(valueSchema, value, Envelope.FieldName.BEFORE);
final Object body = extractBodyValueFromValueStruct(valueSchema, value);
final Object timestamp = extractValueFromValueStruct(valueSchema, value, Envelope.FieldName.TIMESTAMP);
-
+ final Object ddl = extractValueFromValueStruct(valueSchema, value, HistoryRecord.Fields.DDL_STATEMENTS);
// set message headers
message.setHeader(DebeziumConstants.HEADER_IDENTIFIER, record.topic());
message.setHeader(DebeziumConstants.HEADER_KEY, record.key());
@@ -90,6 +91,7 @@ public abstract class DebeziumEndpoint<C extends EmbeddedDebeziumConfiguration>
message.setHeader(DebeziumConstants.HEADER_OPERATION, operation);
message.setHeader(DebeziumConstants.HEADER_BEFORE, before);
message.setHeader(DebeziumConstants.HEADER_TIMESTAMP, timestamp);
+ message.setHeader(DebeziumConstants.HEADER_DDL_SQL, ddl);
message.setHeader(Exchange.MESSAGE_TIMESTAMP, timestamp);
message.setBody(body);
diff --git a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
index 4c9c4ce..1a69941 100644
--- a/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
+++ b/components/camel-debezium/camel-debezium-common/camel-debezium-common-component/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
@@ -199,6 +199,20 @@ public class DebeziumEndpointTest {
assertNull(body);
}
+ @Test
+ void testIfCreatesExchangeFromSourceDdlRecord() {
+ final SourceRecord sourceRecord = createDdlSQLRecord();
+
+ final Exchange exchange = debeziumEndpoint.createDbzExchange(null, sourceRecord);
+ final Message inMessage = exchange.getIn();
+
+ assertNotNull(exchange);
+ // assert headers
+ assertEquals("dummy", inMessage.getHeader(DebeziumConstants.HEADER_IDENTIFIER));
+ assertEquals("SET character_set_server=utf8, collation_server=utf8_bin",
+ inMessage.getHeader(DebeziumConstants.HEADER_DDL_SQL));
+ }
+
private SourceRecord createCreateRecord() {
final Schema recordSchema = SchemaBuilder.struct().field("id", SchemaBuilder.int8()).build();
final Schema sourceSchema = SchemaBuilder.struct().field("lsn", SchemaBuilder.int32()).build();
@@ -256,6 +270,17 @@ public class DebeziumEndpointTest {
createKeyRecord(), envelope.schema(), payload);
}
+ private SourceRecord createDdlSQLRecord() {
+ final Schema recordSchema = SchemaBuilder.struct().field("ddl", SchemaBuilder.string()).build();
+ Envelope.defineSchema().withName("dummy.Envelope").withRecord(recordSchema).withSource(SchemaBuilder.struct().build())
+ .build();
+ final Struct recordValue = new Struct(recordSchema);
+ recordValue.put("ddl", "SET character_set_server=utf8, collation_server=utf8_bin");
+ return new SourceRecord(
+ new HashMap<>(), createSourceOffset(), "dummy", null,
+ null, recordValue.schema(), recordValue);
+ }
+
private SourceRecord createUnknownUnnamedSchemaRecord() {
final Schema recordSchema = SchemaBuilder.struct().field("id", SchemaBuilder.int8()).build();
final Struct before = new Struct(recordSchema);