You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/10/22 13:41:08 UTC
[camel-kafka-connector] branch master updated: Added integration
test for Couchbase sink
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 9da36a0 Added integration test for Couchbase sink
9da36a0 is described below
commit 9da36a0fc2541b708efb1c49402c200ed0d624a9
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sun Jun 21 13:53:49 2020 +0200
Added integration test for Couchbase sink
---
parent/pom.xml | 13 ++
tests/itests-couchbase/pom.xml | 59 ++++++
.../services/CouchbaseLocalContainerService.java | 89 +++++++++
.../couchbase/services/CouchbaseRemoteService.java | 58 ++++++
.../couchbase/services/CouchbaseService.java | 52 ++++++
.../services/CouchbaseServiceFactory.java | 46 +++++
.../sink/CamelCouchbasePropertyFactory.java | 71 ++++++++
.../couchbase/sink/CamelSinkCouchbaseITCase.java | 200 +++++++++++++++++++++
tests/pom.xml | 1 +
9 files changed, 589 insertions(+)
diff --git a/parent/pom.xml b/parent/pom.xml
index 4d78385..1e1cd0d 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -270,6 +270,12 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>couchbase</artifactId>
+ <version>${testcontainers-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>${qpid-jms-client-version}</version>
@@ -326,6 +332,13 @@
<version>${testcontainers-version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.couchbase.client</groupId>
+ <artifactId>java-client</artifactId>
+ <version>${couchbase-client-version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</dependencyManagement>
<build>
diff --git a/tests/itests-couchbase/pom.xml b/tests/itests-couchbase/pom.xml
new file mode 100644
index 0000000..a305ede
--- /dev/null
+++ b/tests/itests-couchbase/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-parent</artifactId>
+ <version>0.6.0-SNAPSHOT</version>
+ <relativePath>../itests-parent/pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>itests-couchbase</artifactId>
+ <name>Camel-Kafka-Connector :: Tests :: Couchbase</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.kafkaconnector</groupId>
+ <artifactId>itests-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-couchbase</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>couchbase</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.couchbase.client</groupId>
+ <artifactId>java-client</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseLocalContainerService.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseLocalContainerService.java
new file mode 100644
index 0000000..c4e0fbc
--- /dev/null
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseLocalContainerService.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.couchbase.services;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.couchbase.BucketDefinition;
+import org.testcontainers.couchbase.CouchbaseContainer;
+
+
+public class CouchbaseLocalContainerService implements CouchbaseService {
+
+ /*
+ * Couchbase container uses a dynamic port for the KV service. The configuration
+ * used in the Camel component tries to use that port by default and it seems
+ * we cannot configure it. Therefore, we override the default container and
+ * force the default KV port to be used.
+ */
+ private class CustomCouchbaseContainer extends CouchbaseContainer {
+ public CustomCouchbaseContainer() {
+ final int kvPort = 11210;
+ addFixedExposedPort(kvPort, kvPort);
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(CouchbaseLocalContainerService.class);
+ private BucketDefinition bucketDefinition = new BucketDefinition("mybucket");
+ private CouchbaseContainer container;
+
+ public CouchbaseLocalContainerService() {
+ container = new CustomCouchbaseContainer()
+ .withBucket(bucketDefinition);
+ }
+
+
+ @Override
+ public String getConnectionString() {
+ return container.getConnectionString();
+ }
+
+
+ @Override
+ public String getUsername() {
+ return container.getUsername();
+ }
+
+ @Override
+ public String getPassword() {
+ return container.getPassword();
+ }
+
+ @Override
+ public String getHostname() {
+ return container.getHost();
+ }
+
+ @Override
+ public int getPort() {
+ return container.getBootstrapHttpDirectPort();
+ }
+
+ @Override
+ public void initialize() {
+ container.start();
+
+ LOG.debug("Couchbase container running at {}", getConnectionString());
+ }
+
+ @Override
+ public void shutdown() {
+ container.stop();
+ }
+}
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseRemoteService.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseRemoteService.java
new file mode 100644
index 0000000..d720aa7
--- /dev/null
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseRemoteService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.couchbase.services;
+
+public class CouchbaseRemoteService implements CouchbaseService {
+ @Override
+ public String getConnectionString() {
+ final int kvPort = 11210;
+ return String.format("couchbase://%s:%d", getHostname(), kvPort);
+ }
+
+ @Override
+ public String getUsername() {
+ return System.getProperty("couchbase.username", "Administrator");
+ }
+
+ @Override
+ public String getPassword() {
+ return System.getProperty("couchbase.password");
+ }
+
+ @Override
+ public String getHostname() {
+ return System.getProperty("couchbase.hostname");
+ }
+
+ @Override
+ public int getPort() {
+ String portValue = System.getProperty("couchbase.port", "8091");
+
+ return Integer.parseInt(portValue);
+ }
+
+ @Override
+ public void initialize() {
+
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+}
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseService.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseService.java
new file mode 100644
index 0000000..2dcd477
--- /dev/null
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseService.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.couchbase.services;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface CouchbaseService extends BeforeAllCallback, AfterAllCallback {
+
+ String getConnectionString();
+ String getUsername();
+ String getPassword();
+ String getHostname();
+ int getPort();
+
+
+ /**
+ * Perform any initialization necessary
+ */
+ void initialize();
+
+ /**
+ * Shuts down the service after the test has completed
+ */
+ void shutdown();
+
+ @Override
+ default void afterAll(ExtensionContext extensionContext) throws Exception {
+ shutdown();
+ }
+
+ @Override
+ default void beforeAll(ExtensionContext extensionContext) throws Exception {
+ initialize();
+ }
+}
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseServiceFactory.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseServiceFactory.java
new file mode 100644
index 0000000..4a0c63c
--- /dev/null
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseServiceFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.couchbase.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class CouchbaseServiceFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(CouchbaseServiceFactory.class);
+
+ private CouchbaseServiceFactory() {
+
+ }
+
+ public static CouchbaseService getService() {
+ String instanceType = System.getProperty("couchbase.instance.type");
+
+ if (instanceType == null || instanceType.equals("local-couchbase-instance")) {
+ return new CouchbaseLocalContainerService();
+ }
+
+ if (instanceType.equals("remote")) {
+ return new CouchbaseRemoteService();
+ }
+
+ LOG.error("Couchbase instance must be one of 'local-couchbase-container' or 'remote");
+ throw new UnsupportedOperationException("Invalid Couchbase instance type");
+ }
+
+}
+
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelCouchbasePropertyFactory.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelCouchbasePropertyFactory.java
new file mode 100644
index 0000000..dd0e9d9
--- /dev/null
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelCouchbasePropertyFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.couchbase.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+public class CamelCouchbasePropertyFactory extends SinkConnectorPropertyFactory<CamelCouchbasePropertyFactory> {
+
+ public CamelCouchbasePropertyFactory withProtocol(String value) {
+ return setProperty("camel.sink.path.protocol", value);
+ }
+
+ public CamelCouchbasePropertyFactory withHostname(String value) {
+ return setProperty("camel.sink.path.hostname", value);
+ }
+
+ public CamelCouchbasePropertyFactory withPort(int value) {
+ return setProperty("camel.sink.path.port", value);
+ }
+
+ public CamelCouchbasePropertyFactory withBucket(String value) {
+ return setProperty("camel.sink.endpoint.bucket", value);
+ }
+
+ public CamelCouchbasePropertyFactory withCollection(String value) {
+ return setProperty("camel.sink.endpoint.collection", value);
+ }
+
+ public CamelCouchbasePropertyFactory withOperation(String value) {
+ return setProperty("camel.sink.endpoint.operation", value);
+ }
+
+ public CamelCouchbasePropertyFactory withUsername(String value) {
+ return setProperty("camel.sink.endpoint.username", value);
+ }
+
+ public CamelCouchbasePropertyFactory withPassword(String value) {
+ return setProperty("camel.sink.endpoint.password", value);
+ }
+
+ public EndpointUrlBuilder<CamelCouchbasePropertyFactory> withUrl(String protocol, String hostname, int port, String bucket) {
+ String sinkUrl = String.format("couchbase:%s://%s:%d/%s", protocol, hostname, port, bucket);
+
+ return new EndpointUrlBuilder<>(this::withSinkUrl, sinkUrl);
+ }
+
+ public static CamelCouchbasePropertyFactory basic() {
+ return new CamelCouchbasePropertyFactory()
+ .withTasksMax(1)
+ .withName("CamelCouchbaseSinkConnector")
+ .withConnectorClass("org.apache.camel.kafkaconnector.couchbase.CamelCouchbaseSinkConnector")
+ .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+ .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+ }
+}
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
new file mode 100644
index 0000000..fd51530
--- /dev/null
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.couchbase.sink;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import com.couchbase.client.java.Cluster;
+import com.couchbase.client.java.json.JsonObject;
+import com.couchbase.client.java.manager.bucket.BucketSettings;
+import com.couchbase.client.java.query.QueryResult;
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.couchbase.services.CouchbaseService;
+import org.apache.camel.kafkaconnector.couchbase.services.CouchbaseServiceFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
+ @RegisterExtension
+ public static CouchbaseService service = CouchbaseServiceFactory.getService();
+
+ private static final Logger LOG = LoggerFactory.getLogger(CamelSinkCouchbaseITCase.class);
+
+ private String bucketName;
+ private String topic;
+
+ private Cluster cluster;
+
+ private final int expect = 10;
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-couchbase-kafka-connector"};
+ }
+
+ @BeforeEach
+ public void setUp() {
+ bucketName = "testBucket" + TestUtils.randomWithRange(0, 100);
+ cluster = Cluster.connect(service.getConnectionString(), service.getUsername(), service.getPassword());
+
+ LOG.debug("Creating a new bucket named {}", bucketName);
+ cluster.buckets().createBucket(BucketSettings.create(bucketName));
+ LOG.debug("Bucket created");
+
+ topic = TestUtils.getDefaultTestTopic(this.getClass());
+ }
+
+ @AfterEach
+ public void tearDown() {
+ LOG.debug("Dropping the test bucket named {}", bucketName);
+ cluster.buckets().dropBucket(bucketName);
+ LOG.debug("Bucket dropped");
+
+ cluster.disconnect();
+ }
+
+ private void produceMessages(CountDownLatch latch) {
+ KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+ try {
+ for (int i = 0; i < expect; i++) {
+ Map<String, String> parameters = new HashMap<>();
+
+ parameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CCB_ID", String.valueOf(i));
+
+ JsonObject jsonObject = JsonObject.create().put("data", String.format("test-%d", i));
+
+ try {
+ kafkaClient.produce(topic, jsonObject.toString(), parameters);
+ } catch (ExecutionException e) {
+ LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ private boolean waitForMinimumRecordCount() {
+ try {
+ String query = String.format("select count(*) as count from `%s`", bucketName);
+ QueryResult queryResult = cluster.query(query);
+ List<JsonObject> results = queryResult.rowsAsObject();
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ int size = results.get(0).getInt("count");
+ if (size < expect) {
+ LOG.info("There are only {} records at the moment", size);
+
+ return false;
+ }
+
+ return size == expect;
+ } catch (Exception e) {
+ LOG.warn("Exception while waiting for the records to arrive: {}", e.getMessage(), e);
+ }
+
+ return false;
+ }
+
+ private void verifyRecords() {
+ String query = String.format("select * from `%s` USE KEYS \"1\"", bucketName);
+ QueryResult queryResult = cluster.query(query);
+
+ List<JsonObject> results = queryResult.rowsAsObject();
+
+ assertFalse(results.isEmpty(), "There should be at least 1 record on the result");
+ LOG.debug("Received record: {}", results.get(0));
+ }
+
+ public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception {
+ connectorPropertyFactory.log();
+ getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+
+ LOG.debug("Creating the producer and sending messages ...");
+ ExecutorService service = Executors.newCachedThreadPool();
+
+ CountDownLatch latch = new CountDownLatch(1);
+ service.submit(() -> produceMessages(latch));
+
+ assertTrue(TestUtils.waitFor(this::waitForMinimumRecordCount));
+
+ LOG.debug("Waiting for the test to complete");
+ if (latch.await(110, TimeUnit.SECONDS)) {
+ verifyRecords();
+ } else {
+ fail("Failed to receive the records within the specified time");
+ }
+ }
+
+ @Disabled("Not formatting the URL correctly - issue #629")
+ @Test
+ @Timeout(90)
+ public void testBasicSendReceive() throws Exception {
+ ConnectorPropertyFactory factory = CamelCouchbasePropertyFactory.basic()
+ .withTopics(topic)
+ .withBucket(bucketName)
+ .withProtocol("http")
+ .withHostname(service.getHostname())
+ .withPort(service.getPort())
+ .withUsername(service.getUsername())
+ .withPassword(service.getPassword());
+
+ runTest(factory);
+ }
+
+ @Test
+ @Timeout(90)
+ public void testBasicSendReceiveUsingUrl() throws Exception {
+ ConnectorPropertyFactory factory = CamelCouchbasePropertyFactory.basic()
+ .withTopics(topic)
+ .withUrl("http", service.getHostname(), service.getPort(), bucketName)
+ .append("username", service.getUsername())
+ .append("password", service.getPassword())
+ .buildUrl();
+
+
+ runTest(factory);
+ }
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index dea3592..e2568c6 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -61,6 +61,7 @@
<module>itests-azure-storage-queue</module>
<module>perf-tests-rabbitmq</module>
<module>itests-rabbitmq</module>
+ <module>itests-couchbase</module>
</modules>