You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by pp...@apache.org on 2021/01/12 12:58:45 UTC
[camel-quarkus] 01/03: Debezium MongoDB Connector native support
#1190
This is an automated email from the ASF dual-hosted git repository.
ppalaga pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
commit a17a5c3f04749fd9b5af67766b4e00f5413d32a8
Author: JiriOndrusek <on...@gmail.com>
AuthorDate: Fri Dec 11 15:27:53 2020 +0100
Debezium MongoDB Connector native support #1190
---
.../reference/extensions/debezium-mongodb.adoc | 8 +-
.../reference/components/debezium-mongodb.adoc | 6 +-
.../debezium-mongodb/integration-test/pom.xml | 66 ---------
.../mongodb/it/DebeziumMongodbResource.java | 51 -------
extensions-jvm/pom.xml | 1 -
.../debezium-mongodb/deployment/pom.xml | 14 +-
.../deployment/DebeziumMongodbProcessor.java | 23 ++-
.../debezium-mongodb/pom.xml | 1 -
.../debezium-mongodb/runtime/pom.xml | 2 +-
.../main/resources/META-INF/quarkus-extension.yaml | 3 +-
extensions/pom.xml | 1 +
integration-tests/debezium/pom.xml | 24 ++++
.../common/it/DebeziumMongodbResource.java | 69 +++++++++
.../quarkus/component/debezium/common/it/Type.java | 2 +-
.../debezium/common/it/AbstractDebeziumTest.java | 41 +++---
.../common/it/AbstractDebeziumTestResource.java | 13 +-
.../common/it/mongodb/DebeziumMongodbIT.java | 18 +--
.../common/it/mongodb/DebeziumMongodbTest.java | 160 +++++++++++++++++++++
.../it/mongodb/DebeziumMongodbTestResource.java | 95 ++++++++++++
.../sqlserver/DebeziumSqlserverTestResource.java | 5 +-
.../debezium/src/test/resources/initMongodb.txt | 36 +++++
pom.xml | 2 +-
poms/bom-test/pom.xml | 5 +
poms/bom/pom.xml | 11 +-
24 files changed, 465 insertions(+), 192 deletions(-)
diff --git a/docs/modules/ROOT/pages/reference/extensions/debezium-mongodb.adoc b/docs/modules/ROOT/pages/reference/extensions/debezium-mongodb.adoc
index 31e93f1..92ed46c 100644
--- a/docs/modules/ROOT/pages/reference/extensions/debezium-mongodb.adoc
+++ b/docs/modules/ROOT/pages/reference/extensions/debezium-mongodb.adoc
@@ -3,15 +3,15 @@
= Debezium MongoDB Connector
:page-aliases: extensions/debezium-mongodb.adoc
:cq-artifact-id: camel-quarkus-debezium-mongodb
-:cq-native-supported: false
-:cq-status: Preview
+:cq-native-supported: true
+:cq-status: Stable
:cq-description: Capture changes from a MongoDB database.
:cq-deprecated: false
:cq-jvm-since: 1.0.0
-:cq-native-since: 1.0.0
+:cq-native-since: 1.6.0
[.badges]
-[.badge-key]##JVM since##[.badge-supported]##1.0.0## [.badge-key]##Native##[.badge-unsupported]##unsupported##
+[.badge-key]##JVM since##[.badge-supported]##1.0.0## [.badge-key]##Native since##[.badge-supported]##1.6.0##
Capture changes from a MongoDB database.
diff --git a/docs/modules/ROOT/partials/reference/components/debezium-mongodb.adoc b/docs/modules/ROOT/partials/reference/components/debezium-mongodb.adoc
index e53e90c..e4c1c31 100644
--- a/docs/modules/ROOT/partials/reference/components/debezium-mongodb.adoc
+++ b/docs/modules/ROOT/partials/reference/components/debezium-mongodb.adoc
@@ -2,11 +2,11 @@
// This file was generated by camel-quarkus-maven-plugin:update-extension-doc-page
:cq-artifact-id: camel-quarkus-debezium-mongodb
:cq-artifact-id-base: debezium-mongodb
-:cq-native-supported: false
-:cq-status: Preview
+:cq-native-supported: true
+:cq-status: Stable
:cq-deprecated: false
:cq-jvm-since: 1.0.0
-:cq-native-since: 1.0.0
+:cq-native-since: 1.6.0
:cq-camel-part-name: debezium-mongodb
:cq-camel-part-title: Debezium MongoDB Connector
:cq-camel-part-description: Capture changes from a MongoDB database.
diff --git a/extensions-jvm/debezium-mongodb/integration-test/pom.xml b/extensions-jvm/debezium-mongodb/integration-test/pom.xml
deleted file mode 100644
index 80ed99c..0000000
--- a/extensions-jvm/debezium-mongodb/integration-test/pom.xml
+++ /dev/null
@@ -1,66 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.camel.quarkus</groupId>
- <artifactId>camel-quarkus-build-parent-it</artifactId>
- <version>1.6.0-SNAPSHOT</version>
- <relativePath>../../../poms/build-parent-it/pom.xml</relativePath>
- </parent>
-
- <artifactId>camel-quarkus-debezium-mongodb-integration-test</artifactId>
- <name>Camel Quarkus :: Debezium MongoDB Connector :: Integration Test</name>
- <description>Integration tests for Camel Quarkus Debezium MongoDB Connector extension</description>
-
- <properties>
- <!-- mvnd, a.k.a. Maven Daemon: https://github.com/mvndaemon/mvnd -->
- <!-- The following rule tells mvnd to build the listed deployment modules before this module. -->
- <!-- This is important because mvnd builds modules in parallel by default. The deployment modules are not -->
- <!-- explicit dependencies of this module in the Maven sense, although they are required by the Quarkus Maven plugin. -->
- <!-- Please update the rule whenever you change the dependencies of this module by running -->
- <!-- mvn process-resources -Pformat from the root directory -->
- <mvnd.builder.rule>camel-quarkus-debezium-mongodb-deployment</mvnd.builder.rule>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.camel.quarkus</groupId>
- <artifactId>camel-quarkus-debezium-mongodb</artifactId>
- </dependency>
- <dependency>
- <groupId>io.quarkus</groupId>
- <artifactId>quarkus-resteasy</artifactId>
- </dependency>
-
- <!-- test dependencies -->
- <dependency>
- <groupId>io.quarkus</groupId>
- <artifactId>quarkus-junit5</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>io.rest-assured</groupId>
- <artifactId>rest-assured</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
-</project>
diff --git a/extensions-jvm/debezium-mongodb/integration-test/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/it/DebeziumMongodbResource.java b/extensions-jvm/debezium-mongodb/integration-test/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/it/DebeziumMongodbResource.java
deleted file mode 100644
index 6341702..0000000
--- a/extensions-jvm/debezium-mongodb/integration-test/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/it/DebeziumMongodbResource.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.quarkus.component.debezium.mongodb.it;
-
-import javax.enterprise.context.ApplicationScoped;
-import javax.inject.Inject;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import org.apache.camel.CamelContext;
-import org.jboss.logging.Logger;
-
-@Path("/debezium-mongodb")
-@ApplicationScoped
-public class DebeziumMongodbResource {
-
- private static final Logger LOG = Logger.getLogger(DebeziumMongodbResource.class);
-
- private static final String COMPONENT_DEBEZIUM_MONGODB = "debezium-mongodb";
- @Inject
- CamelContext context;
-
- @Path("/load/component/debezium-mongodb")
- @GET
- @Produces(MediaType.TEXT_PLAIN)
- public Response loadComponentDebeziumMongodb() throws Exception {
- /* This is an autogenerated test */
- if (context.getComponent(COMPONENT_DEBEZIUM_MONGODB) != null) {
- return Response.ok().build();
- }
- LOG.warnf("Could not load [%s] from the Camel context", COMPONENT_DEBEZIUM_MONGODB);
- return Response.status(500, COMPONENT_DEBEZIUM_MONGODB + " could not be loaded from the Camel context").build();
- }
-}
diff --git a/extensions-jvm/pom.xml b/extensions-jvm/pom.xml
index 6d803fe..bb4ec67 100644
--- a/extensions-jvm/pom.xml
+++ b/extensions-jvm/pom.xml
@@ -59,7 +59,6 @@
<module>cometd</module>
<module>corda</module>
<module>couchbase</module>
- <module>debezium-mongodb</module>
<module>digitalocean</module>
<module>djl</module>
<module>dns</module>
diff --git a/extensions-jvm/debezium-mongodb/deployment/pom.xml b/extensions/debezium-mongodb/deployment/pom.xml
similarity index 87%
rename from extensions-jvm/debezium-mongodb/deployment/pom.xml
rename to extensions/debezium-mongodb/deployment/pom.xml
index e13bb64..d4523ba 100644
--- a/extensions-jvm/debezium-mongodb/deployment/pom.xml
+++ b/extensions/debezium-mongodb/deployment/pom.xml
@@ -17,7 +17,9 @@
limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.camel.quarkus</groupId>
@@ -36,15 +38,19 @@
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
- <artifactId>camel-quarkus-support-debezium-deployment</artifactId>
+ <artifactId>camel-quarkus-debezium-mongodb</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
- <artifactId>camel-quarkus-mongodb-deployment</artifactId>
+ <artifactId>camel-quarkus-support-debezium-deployment</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-mongodb-client-deployment</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
- <artifactId>camel-quarkus-debezium-mongodb</artifactId>
+ <artifactId>camel-quarkus-mongodb-deployment</artifactId>
</dependency>
</dependencies>
diff --git a/extensions-jvm/debezium-mongodb/deployment/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/deployment/DebeziumMongodbProcessor.java b/extensions/debezium-mongodb/deployment/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/deployment/DebeziumMongodbProcessor.java
similarity index 62%
rename from extensions-jvm/debezium-mongodb/deployment/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/deployment/DebeziumMongodbProcessor.java
rename to extensions/debezium-mongodb/deployment/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/deployment/DebeziumMongodbProcessor.java
index 2231405..45c7089 100644
--- a/extensions-jvm/debezium-mongodb/deployment/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/deployment/DebeziumMongodbProcessor.java
+++ b/extensions/debezium-mongodb/deployment/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/deployment/DebeziumMongodbProcessor.java
@@ -16,16 +16,13 @@
*/
package org.apache.camel.quarkus.component.debezium.mongodb.deployment;
+import io.debezium.connector.mongodb.MongoDbConnector;
+import io.debezium.connector.mongodb.MongoDbConnectorTask;
import io.quarkus.deployment.annotations.BuildStep;
-import io.quarkus.deployment.annotations.ExecutionTime;
-import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.FeatureBuildItem;
-import io.quarkus.deployment.pkg.steps.NativeBuild;
-import org.apache.camel.quarkus.core.JvmOnlyRecorder;
-import org.jboss.logging.Logger;
+import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
class DebeziumMongodbProcessor {
- private static final Logger LOG = Logger.getLogger(DebeziumMongodbProcessor.class);
private static final String FEATURE = "camel-debezium-mongodb";
@@ -34,14 +31,10 @@ class DebeziumMongodbProcessor {
return new FeatureBuildItem(FEATURE);
}
- /**
- * Remove this once this extension starts supporting the native mode.
- */
- @BuildStep(onlyIf = NativeBuild.class)
- @Record(value = ExecutionTime.RUNTIME_INIT)
- void warnJvmInNative(JvmOnlyRecorder recorder) {
- JvmOnlyRecorder.warnJvmInNative(LOG, FEATURE); // warn at build time
- recorder.warnJvmInNative(FEATURE); // warn at runtime
+ @BuildStep
+ ReflectiveClassBuildItem reflectiveClasses() {
+ return new ReflectiveClassBuildItem(false, false,
+ new String[] { MongoDbConnector.class.getName(),
+ MongoDbConnectorTask.class.getName() });
}
-
}
diff --git a/extensions-jvm/debezium-mongodb/pom.xml b/extensions/debezium-mongodb/pom.xml
similarity index 97%
rename from extensions-jvm/debezium-mongodb/pom.xml
rename to extensions/debezium-mongodb/pom.xml
index 2d64168..8d2cdd8 100644
--- a/extensions-jvm/debezium-mongodb/pom.xml
+++ b/extensions/debezium-mongodb/pom.xml
@@ -33,6 +33,5 @@
<modules>
<module>deployment</module>
<module>runtime</module>
- <module>integration-test</module>
</modules>
</project>
diff --git a/extensions-jvm/debezium-mongodb/runtime/pom.xml b/extensions/debezium-mongodb/runtime/pom.xml
similarity index 98%
rename from extensions-jvm/debezium-mongodb/runtime/pom.xml
rename to extensions/debezium-mongodb/runtime/pom.xml
index 60820b7..28165cf 100644
--- a/extensions-jvm/debezium-mongodb/runtime/pom.xml
+++ b/extensions/debezium-mongodb/runtime/pom.xml
@@ -31,7 +31,7 @@
<properties>
<camel.quarkus.jvmSince>1.0.0</camel.quarkus.jvmSince>
- <camel.quarkus.nativeSince>1.0.0</camel.quarkus.nativeSince>
+ <camel.quarkus.nativeSince>1.6.0</camel.quarkus.nativeSince>
</properties>
<dependencyManagement>
diff --git a/extensions-jvm/debezium-mongodb/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/debezium-mongodb/runtime/src/main/resources/META-INF/quarkus-extension.yaml
similarity index 97%
rename from extensions-jvm/debezium-mongodb/runtime/src/main/resources/META-INF/quarkus-extension.yaml
rename to extensions/debezium-mongodb/runtime/src/main/resources/META-INF/quarkus-extension.yaml
index ce828ea..d6d0ac4 100644
--- a/extensions-jvm/debezium-mongodb/runtime/src/main/resources/META-INF/quarkus-extension.yaml
+++ b/extensions/debezium-mongodb/runtime/src/main/resources/META-INF/quarkus-extension.yaml
@@ -24,9 +24,8 @@
name: "Camel Debezium MongoDB Connector"
description: "Capture changes from a MongoDB database"
metadata:
- unlisted: true
guide: "https://camel.apache.org/camel-quarkus/latest/reference/extensions/debezium-mongodb.html"
categories:
- "integration"
status:
- - "preview"
+ - "stable"
diff --git a/extensions/pom.xml b/extensions/pom.xml
index 445b245..6ea25f0 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -91,6 +91,7 @@
<module>csimple</module>
<module>csv</module>
<module>dataformat</module>
+ <module>debezium-mongodb</module>
<module>debezium-mysql</module>
<module>debezium-postgres</module>
<module>debezium-sqlserver</module>
diff --git a/integration-tests/debezium/pom.xml b/integration-tests/debezium/pom.xml
index e17770d..c81fb2b 100644
--- a/integration-tests/debezium/pom.xml
+++ b/integration-tests/debezium/pom.xml
@@ -57,6 +57,10 @@
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-debezium-postgres</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-debezium-mongodb</artifactId>
+ </dependency>
<dependency>
<groupId>io.quarkus</groupId>
@@ -98,6 +102,26 @@
<artifactId>mssqlserver</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mongodb</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-debezium-mongodb-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
<dependency>
diff --git a/integration-tests/debezium/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/DebeziumMongodbResource.java b/integration-tests/debezium/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/DebeziumMongodbResource.java
new file mode 100644
index 0000000..336374b
--- /dev/null
+++ b/integration-tests/debezium/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/DebeziumMongodbResource.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.debezium.common.it;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/debezium-mongodb")
+@ApplicationScoped
+public class DebeziumMongodbResource extends AbstractDebeziumResource {
+
+ public DebeziumMongodbResource() {
+ super(Type.mongodb);
+ }
+
+ @Path("/receiveAsRecord")
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Record receiveAsRecord() {
+ return super.receiveAsRecord();
+ }
+
+ @Path("/receive")
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ public String receive() {
+ return super.receive();
+ }
+
+ @Path("/receiveOperation")
+ @GET
+ @Produces(MediaType.TEXT_PLAIN)
+ public String receiveOperation() {
+ Record record = receiveAsRecord();
+
+ if (record == null) {
+ return null;
+ }
+ return record.getOperation();
+ }
+
+ @Override
+ String getEndpoinUrl(String hostname, String port, String username, String password, String databaseServerName,
+ String offsetStorageFileName) {
+ return Type.mongodb.getComponent() + ":localhost?"
+ + "offsetStorageFileName=" + offsetStorageFileName
+ + "&mongodbUser=" + System.getProperty(Type.mongodb.getPropertyUsername())
+ + "&mongodbPassword=" + System.getProperty(Type.mongodb.getPropertyPassword())
+ + "&mongodbName=docker-rs"
+ + "&mongodbHosts=" + hostname + ":" + port;
+ }
+}
diff --git a/integration-tests/debezium/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/Type.java b/integration-tests/debezium/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/Type.java
index 13c046e..503e6a0 100644
--- a/integration-tests/debezium/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/Type.java
+++ b/integration-tests/debezium/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/Type.java
@@ -23,7 +23,7 @@ package org.apache.camel.quarkus.component.debezium.common.it;
*/
public enum Type {
- postgres, mysql, sqlserver;
+ postgres, mysql, sqlserver, mongodb;
/** name of the camel component */
public String getComponent() {
diff --git a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTest.java b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTest.java
index 8438b85..26b0c30 100644
--- a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTest.java
+++ b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTest.java
@@ -39,10 +39,10 @@ import static org.hamcrest.Matchers.is;
public abstract class AbstractDebeziumTest {
private static final Logger LOG = Logger.getLogger(AbstractDebeziumTest.class);
- private static String COMPANY_1 = "Best Company";
- private static String COMPANY_2 = "Even Better Company";
- private static String CITY_1 = "Prague";
- private static String CITY_2 = "Paris";
+ protected static String COMPANY_1 = "Best Company";
+ protected static String COMPANY_2 = "Even Better Company";
+ protected static String CITY_1 = "Prague";
+ protected static String CITY_2 = "Paris";
public static int REPEAT_COUNT = 3;
/**
@@ -63,16 +63,12 @@ public abstract class AbstractDebeziumTest {
@Test
@Order(1)
public void testInsert() throws SQLException {
- if (getConnection() == null) {
- LOG.warn("Test 'testInsert' is skipped, because container is not running.");
- return;
- }
+ isInitialized("Test 'testInsert' is skipped, because container is not running.");
int i = 0;
while (i++ < REPEAT_COUNT) {
- executeUpdate(String.format("INSERT INTO %s (name, city) VALUES ('%s', '%s')", getCompanyTableName(),
- COMPANY_1 + "_" + i, CITY_1));
+ insertCompany(COMPANY_1 + "_" + i, CITY_1);
Response response = receiveResponse();
@@ -94,13 +90,19 @@ public abstract class AbstractDebeziumTest {
i < REPEAT_COUNT);
}
+ protected void isInitialized(String s) {
+ Assert.assertNotNull(s, getConnection());
+ }
+
+ protected void insertCompany(String name, String city) throws SQLException {
+ executeUpdate(String.format("INSERT INTO %s (name, city) VALUES ('%s', '%s')", getCompanyTableName(),
+ name, city));
+ }
+
@Test
@Order(2)
public void testUpdate() throws SQLException {
- if (getConnection() == null) {
- LOG.warn("Test 'testUpdate' is skipped, because container is not running.");
- return;
- }
+ isInitialized("Test 'testUpdate' is skipped, because container is not running.");
executeUpdate(String.format("INSERT INTO %s (name, city) VALUES ('%s', '%s')", getCompanyTableName(),
COMPANY_2, CITY_2));
@@ -120,10 +122,7 @@ public abstract class AbstractDebeziumTest {
@Test
@Order(3)
public void testDelete() throws SQLException {
- if (getConnection() == null) {
- LOG.warn("Test 'testDelete' is skipped, because container is not running.");
- return;
- }
+ isInitialized("Test 'testDelete' is skipped, because container is not running.");
int res = executeUpdate("DELETE FROM " + getCompanyTableName());
int i = 0;
@@ -148,6 +147,12 @@ public abstract class AbstractDebeziumTest {
.body(stringMatcher);
}
+ protected void receiveResponse(int statusCode, Matcher<String> stringMatcher, String methodName) {
+ receiveResponse(methodName).then()
+ .statusCode(statusCode)
+ .body(stringMatcher);
+ }
+
protected int executeUpdate(String sql) throws SQLException {
try (Statement statement = getConnection().createStatement()) {
return statement.executeUpdate(sql);
diff --git a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTestResource.java b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTestResource.java
index 86be357..68f168a 100644
--- a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTestResource.java
+++ b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTestResource.java
@@ -61,16 +61,19 @@ public abstract class AbstractDebeziumTestResource<T extends GenericContainer> i
container = createContainer();
- container.start();
+ startContainer();
Map<String, String> map = CollectionHelper.mapOf(
type.getPropertyHostname(), container.getContainerIpAddress(),
type.getPropertyPort(), container.getMappedPort(getPort()) + "",
- type.getPropertyUsername(), getUsername(),
- type.getPropertyPassword(), getPassword(),
type.getPropertyOffsetFileName(), storeFile.toString(),
type.getPropertyJdbc(), getJdbcUrl());
+ if (getUsername() != null) {
+ map.put(type.getPropertyUsername(), getUsername());
+ map.put(type.getPropertyPassword(), getPassword());
+ }
+
return map;
} catch (Exception e) {
@@ -79,6 +82,10 @@ public abstract class AbstractDebeziumTestResource<T extends GenericContainer> i
}
}
+ protected void startContainer() throws Exception {
+ container.start();
+ }
+
@Override
public void stop() {
try {
diff --git a/extensions-jvm/debezium-mongodb/integration-test/src/test/java/org/apache/camel/quarkus/component/debezium/mongodb/it/DebeziumMongodbTest.java b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbIT.java
similarity index 63%
rename from extensions-jvm/debezium-mongodb/integration-test/src/test/java/org/apache/camel/quarkus/component/debezium/mongodb/it/DebeziumMongodbTest.java
rename to integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbIT.java
index f14194a..25b04d6 100644
--- a/extensions-jvm/debezium-mongodb/integration-test/src/test/java/org/apache/camel/quarkus/component/debezium/mongodb/it/DebeziumMongodbTest.java
+++ b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbIT.java
@@ -14,21 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.quarkus.component.debezium.mongodb.it;
+package org.apache.camel.quarkus.component.debezium.common.it.mongodb;
-import io.quarkus.test.junit.QuarkusTest;
-import io.restassured.RestAssured;
-import org.junit.jupiter.api.Test;
+import io.quarkus.test.junit.NativeImageTest;
-@QuarkusTest
-class DebeziumMongodbTest {
-
- @Test
- public void loadComponentDebeziumMongodb() {
- /* A simple autogenerated test */
- RestAssured.get("/debezium-mongodb/load/component/debezium-mongodb")
- .then()
- .statusCode(200);
- }
+@NativeImageTest
+class DebeziumMongodbIT extends DebeziumMongodbTest {
}
diff --git a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTest.java b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTest.java
new file mode 100644
index 0000000..dc7c161
--- /dev/null
+++ b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.debezium.common.it.mongodb;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+import com.mongodb.client.MongoClient;
+import com.mongodb.client.MongoClients;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.result.DeleteResult;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import org.apache.camel.quarkus.component.debezium.common.it.AbstractDebeziumTest;
+import org.apache.camel.quarkus.component.debezium.common.it.Type;
+import org.bson.Document;
+import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+
+@QuarkusTest
+@QuarkusTestResource(DebeziumMongodbTestResource.class)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+class DebeziumMongodbTest extends AbstractDebeziumTest {
+ private static final Logger LOG = Logger.getLogger(DebeziumMongodbTest.class);
+
+ //constant with value of Type.mongodb.getJdbcProperty
+ public static final String PROPERTY_JDBC = "mongodb_jdbc";
+
+ private static MongoClient mongoClient;
+
+ private static MongoCollection companies;
+
+ public DebeziumMongodbTest() {
+ super(Type.mongodb);
+ }
+
+ @BeforeAll
+ public static void setUp() throws SQLException {
+ final String mongoUrl = System.getProperty(Type.mongodb.getPropertyJdbc());
+
+ if (mongoUrl != null) {
+ mongoClient = MongoClients.create(mongoUrl);
+ } else {
+ LOG.warn("Container is not running. Connection is not created.");
+ }
+
+ org.junit.Assume.assumeTrue(mongoClient != null);
+
+ MongoDatabase db = mongoClient.getDatabase("test");
+
+ companies = db.getCollection("companies");
+ }
+
+ @Before
+ public void before() {
+ org.junit.Assume.assumeTrue(mongoClient != null);
+ }
+
+ @AfterAll
+ public static void cleanUp() throws SQLException {
+ if (mongoClient != null) {
+ mongoClient.close();
+ }
+ }
+
+ @Override
+ protected Connection getConnection() {
+ throw new IllegalStateException("Not used");
+ }
+
+ @Override
+ protected String getCompanyTableName() {
+ throw new IllegalStateException("Not used");
+ }
+
+ @Test
+ @Order(0)
+ @EnabledIfSystemProperty(named = PROPERTY_JDBC, matches = ".*")
+ public void testReceiveInit() {
+ receiveResponse()
+ .then()
+ .statusCode(200)
+ .body(containsString("init"));
+ }
+
+ @Override
+ protected void insertCompany(String name, String city) {
+ Document doc = new Document();
+ doc.put("name", name);
+ doc.put("city", city);
+ companies.insertOne(doc);
+ }
+
+ @Override
+ protected void isInitialized(String s) {
+ Assert.assertNotNull(s, mongoClient);
+ }
+
+ @Test
+ @Order(1)
+ @EnabledIfSystemProperty(named = PROPERTY_JDBC, matches = ".*")
+ public void testInsert() throws SQLException {
+ super.testInsert();
+ }
+
+ @Test
+ @Order(2)
+ @EnabledIfSystemProperty(named = PROPERTY_JDBC, matches = ".*")
+ public void testUpdate() throws SQLException {
+ Document doc = new Document().append("name", COMPANY_2).append("city", CITY_2);
+ companies.insertOne(doc);
+
+ //validate that event is received
+ receiveResponse(200, containsString(COMPANY_2));
+
+ Document searchQuery = new Document().append("name", COMPANY_2);
+ Document updateQuery = new Document().append("$set", new Document().append("city", CITY_2 + "_changed"));
+ companies.updateMany(searchQuery, updateQuery);
+
+ //validate that event for create is in queue
+ receiveResponse(200, containsString(CITY_2 + "_changed"));
+ }
+
+ @Test
+ @Order(3)
+ @EnabledIfSystemProperty(named = PROPERTY_JDBC, matches = ".*")
+ public void testDelete() throws SQLException {
+ DeleteResult dr = companies.deleteMany(new Document().append("name", COMPANY_2));
+ Assert.assertEquals("Only one company should be deleted.", 1, dr.getDeletedCount());
+
+ //validate that event for delete is in queue
+ receiveResponse(200, equalTo("d"), "/receiveOperation");
+ }
+}
diff --git a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTestResource.java b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTestResource.java
new file mode 100644
index 0000000..f604aa4
--- /dev/null
+++ b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTestResource.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.quarkus.component.debezium.common.it.mongodb;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import org.apache.camel.quarkus.component.debezium.common.it.AbstractDebeziumTestResource;
+import org.apache.camel.quarkus.component.debezium.common.it.Type;
+import org.jboss.logging.Logger;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.wait.strategy.Wait;
+
+public class DebeziumMongodbTestResource extends AbstractDebeziumTestResource<GenericContainer> {
+ private static final Logger LOG = Logger.getLogger(DebeziumMongodbTestResource.class);
+
+ private static final String PRIVATE_HOST = "mongodb_private";
+ private static final String DB_USERNAME = "debezium";
+ private static final String DB_PASSWORD = "dbz";
+ private static int DB_PORT = 27017;
+
+ public DebeziumMongodbTestResource() {
+ super(Type.mongodb);
+ }
+
+ private Network net = Network.newNetwork();;
+
+ @Override
+ protected GenericContainer createContainer() {
+ return new GenericContainer("mongo")
+ .withExposedPorts(DB_PORT)
+ .withCommand("--replSet", "my-mongo-set")
+ .withNetwork(net)
+ .withNetworkAliases(PRIVATE_HOST)
+ .waitingFor(
+ Wait.forLogMessage(".*Waiting for connections.*", 1));
+
+ }
+
+ @Override
+ protected void startContainer() throws Exception {
+ super.startContainer();
+
+ execScriptInContainer("initMongodb.txt");
+ }
+
+ private void execScriptInContainer(String script) throws Exception {
+ String cmd = new String(Files.readAllBytes(Paths.get(getClass().getResource("/" + script).toURI())));
+ String[] cmds = cmd.split("\\n\\n");
+
+ for (int i = 0; i < cmds.length; i++) {
+ Container.ExecResult er = container.execInContainer(new String[] { "mongo", "--eval", cmds[i] });
+ }
+ }
+
+ @Override
+ protected String getJdbcUrl() {
+ final String jdbcUrl = String.format("mongodb://%s:%s@%s:%d", DB_USERNAME, DB_PASSWORD, container.getHost(),
+ container.getMappedPort(DB_PORT));
+
+ return jdbcUrl;
+ }
+
+ @Override
+ protected String getUsername() {
+ return DB_USERNAME;
+ }
+
+ @Override
+ protected String getPassword() {
+ return DB_PASSWORD;
+ }
+
+ @Override
+ protected int getPort() {
+ return DB_PORT;
+ }
+}
diff --git a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/sqlserver/DebeziumSqlserverTestResource.java b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/sqlserver/DebeziumSqlserverTestResource.java
index 2fee946..e32258d 100644
--- a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/sqlserver/DebeziumSqlserverTestResource.java
+++ b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/sqlserver/DebeziumSqlserverTestResource.java
@@ -29,6 +29,7 @@ import org.apache.camel.quarkus.component.debezium.common.it.DebeziumSqlserverRe
import org.apache.camel.quarkus.component.debezium.common.it.Type;
import org.jboss.logging.Logger;
import org.testcontainers.containers.MSSQLServerContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
public class DebeziumSqlserverTestResource extends AbstractDebeziumTestResource<MSSQLServerContainer> {
private static final Logger LOG = Logger.getLogger(DebeziumSqlserverTestResource.class);
@@ -44,7 +45,9 @@ public class DebeziumSqlserverTestResource extends AbstractDebeziumTestResource<
@Override
protected MSSQLServerContainer createContainer() {
return new MSSQLServerContainer<>().withEnv(Collections.singletonMap("MSSQL_AGENT_ENABLED", "True"))
- .withInitScript("initSqlserver.sql");
+ .withInitScript("initSqlserver.sql")
+ .waitingFor(
+ Wait.forLogMessage(".*xp_sqlagent_notify.*", 1));
}
@Override
diff --git a/integration-tests/debezium/src/test/resources/initMongodb.txt b/integration-tests/debezium/src/test/resources/initMongodb.txt
new file mode 100644
index 0000000..d9cfc06
--- /dev/null
+++ b/integration-tests/debezium/src/test/resources/initMongodb.txt
@@ -0,0 +1,36 @@
+rs.initiate( {
+ '_id' : 'my-mongo-set',
+ 'members' : [{
+ '_id' : 0,
+ 'host' : 'mongodb_private:27017',
+ 'priority': 2}]
+ });
+
+db.getSiblingDB('admin').runCommand( {
+ createRole: 'listDatabases',
+ privileges: [ {
+ resource: { cluster : true },
+ actions: [ 'listDatabases']
+ } ] ,
+ roles: []
+});
+
+db.getSiblingDB('admin').createUser({
+ user: "debezium",
+ pwd:"dbz",
+ roles: [ {
+ role: "userAdminAnyDatabase",
+ db: "admin"
+ }, {
+ role: "dbAdminAnyDatabase",
+ db: "admin"
+ }, {
+ role: "readWriteAnyDatabase",
+ db:"admin"
+ }, {
+ role: "clusterAdmin",
+ db: "admin"
+ }]
+});
+
+db.test.insert({'name':'init'})
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index ab17719..1f33bbc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,7 +56,7 @@
<commons-lang.version>2.6</commons-lang.version><!-- used by hbase, should be pretty stable as commons-lang is not developed actively anymore -->
<commons-math3.version>3.6.1</commons-math3.version><!-- Mess in the transitive dependencies of Spark and hbase-testing-util -->
<curator.version>4.3.0</curator.version><!-- Mess in the transitive dependencies of Spark, Zookeeper and other hadoop related components -->
- <debezium.version>1.3.0.Final</debezium.version>
+ <debezium.version>1.4.0.Final</debezium.version>
<derby.version>10.15.2.0</derby.version><!-- Spark -->
<freemarker.version>2.3.30</freemarker.version>
<fommil.netlib.core.version>1.1.2</fommil.netlib.core.version><!-- Mess in Weka transitive deps -->
diff --git a/poms/bom-test/pom.xml b/poms/bom-test/pom.xml
index 76ccd47..ad0f9d2 100644
--- a/poms/bom-test/pom.xml
+++ b/poms/bom-test/pom.xml
@@ -159,6 +159,11 @@
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
+ <artifactId>mongodb</artifactId>
+ <version>${testcontainers.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>${testcontainers.version}</version>
</dependency>
diff --git a/poms/bom/pom.xml b/poms/bom/pom.xml
index 60f6143..c531eab 100644
--- a/poms/bom/pom.xml
+++ b/poms/bom/pom.xml
@@ -705,12 +705,6 @@
<groupId>org.apache.camel</groupId>
<artifactId>camel-debezium-mongodb</artifactId>
<version>${camel.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.mongodb</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
@@ -5586,6 +5580,11 @@
</dependency>
<dependency>
<groupId>io.debezium</groupId>
+ <artifactId>debezium-connector-mongodb</artifactId>
+ <version>${debezium.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
</dependency>