You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/10/22 13:41:08 UTC

[camel-kafka-connector] branch master updated: Added integration test for Couchbase sink

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 9da36a0  Added integration test for Couchbase sink
9da36a0 is described below

commit 9da36a0fc2541b708efb1c49402c200ed0d624a9
Author: Otavio R. Piske <an...@gmail.com>
AuthorDate: Sun Jun 21 13:53:49 2020 +0200

    Added integration test for Couchbase sink
---
 parent/pom.xml                                     |  13 ++
 tests/itests-couchbase/pom.xml                     |  59 ++++++
 .../services/CouchbaseLocalContainerService.java   |  89 +++++++++
 .../couchbase/services/CouchbaseRemoteService.java |  58 ++++++
 .../couchbase/services/CouchbaseService.java       |  52 ++++++
 .../services/CouchbaseServiceFactory.java          |  46 +++++
 .../sink/CamelCouchbasePropertyFactory.java        |  71 ++++++++
 .../couchbase/sink/CamelSinkCouchbaseITCase.java   | 200 +++++++++++++++++++++
 tests/pom.xml                                      |   1 +
 9 files changed, 589 insertions(+)

diff --git a/parent/pom.xml b/parent/pom.xml
index 4d78385..1e1cd0d 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -270,6 +270,12 @@
                 <scope>test</scope>
             </dependency>
             <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>couchbase</artifactId>
+                <version>${testcontainers-version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
                 <groupId>org.apache.qpid</groupId>
                 <artifactId>qpid-jms-client</artifactId>
                 <version>${qpid-jms-client-version}</version>
@@ -326,6 +332,13 @@
                 <version>${testcontainers-version}</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+                <groupId>com.couchbase.client</groupId>
+                <artifactId>java-client</artifactId>
+                <version>${couchbase-client-version}</version>
+                <scope>test</scope>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>
     <build>
diff --git a/tests/itests-couchbase/pom.xml b/tests/itests-couchbase/pom.xml
new file mode 100644
index 0000000..a305ede
--- /dev/null
+++ b/tests/itests-couchbase/pom.xml
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.camel.kafkaconnector</groupId>
+        <artifactId>itests-parent</artifactId>
+        <version>0.6.0-SNAPSHOT</version>
+        <relativePath>../itests-parent/pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>itests-couchbase</artifactId>
+    <name>Camel-Kafka-Connector :: Tests :: Couchbase</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel.kafkaconnector</groupId>
+            <artifactId>itests-common</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-couchbase</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>couchbase</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.couchbase.client</groupId>
+            <artifactId>java-client</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseLocalContainerService.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseLocalContainerService.java
new file mode 100644
index 0000000..c4e0fbc
--- /dev/null
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseLocalContainerService.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.couchbase.services;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.couchbase.BucketDefinition;
+import org.testcontainers.couchbase.CouchbaseContainer;
+
+
+public class CouchbaseLocalContainerService implements CouchbaseService {
+
+    /*
+     * Couchbase container uses a dynamic port for the KV service. The configuration
+     * used in the Camel component tries to use that port by default and it seems
+     * we cannot configure it. Therefore, we override the default container and
+     * force the default KV port to be used.
+     */
+    private class CustomCouchbaseContainer extends CouchbaseContainer {
+        public CustomCouchbaseContainer() {
+            final int kvPort = 11210;
+            addFixedExposedPort(kvPort, kvPort);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(CouchbaseLocalContainerService.class);
+    private BucketDefinition bucketDefinition = new BucketDefinition("mybucket");
+    private CouchbaseContainer container;
+
+    public CouchbaseLocalContainerService() {
+        container = new CustomCouchbaseContainer()
+                .withBucket(bucketDefinition);
+    }
+
+
+    @Override
+    public String getConnectionString() {
+        return container.getConnectionString();
+    }
+
+
+    @Override
+    public String getUsername() {
+        return container.getUsername();
+    }
+
+    @Override
+    public String getPassword() {
+        return container.getPassword();
+    }
+
+    @Override
+    public String getHostname() {
+        return container.getHost();
+    }
+
+    @Override
+    public int getPort() {
+        return container.getBootstrapHttpDirectPort();
+    }
+
+    @Override
+    public void initialize() {
+        container.start();
+
+        LOG.debug("Couchbase container running at {}", getConnectionString());
+    }
+
+    @Override
+    public void shutdown() {
+        container.stop();
+    }
+}
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseRemoteService.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseRemoteService.java
new file mode 100644
index 0000000..d720aa7
--- /dev/null
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseRemoteService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.couchbase.services;
+
+public class CouchbaseRemoteService implements CouchbaseService {
+    @Override
+    public String getConnectionString() {
+        final int kvPort = 11210;
+        return String.format("couchbase://%s:%d", getHostname(), kvPort);
+    }
+
+    @Override
+    public String getUsername() {
+        return System.getProperty("couchbase.username", "Administrator");
+    }
+
+    @Override
+    public String getPassword() {
+        return System.getProperty("couchbase.password");
+    }
+
+    @Override
+    public String getHostname() {
+        return System.getProperty("couchbase.hostname");
+    }
+
+    @Override
+    public int getPort() {
+        String portValue = System.getProperty("couchbase.port", "8091");
+
+        return Integer.parseInt(portValue);
+    }
+
+    @Override
+    public void initialize() {
+
+    }
+
+    @Override
+    public void shutdown() {
+
+    }
+}
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseService.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseService.java
new file mode 100644
index 0000000..2dcd477
--- /dev/null
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseService.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.couchbase.services;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public interface CouchbaseService extends BeforeAllCallback, AfterAllCallback {
+
+    String getConnectionString();
+    String getUsername();
+    String getPassword();
+    String getHostname();
+    int getPort();
+
+
+    /**
+     * Perform any initialization necessary
+     */
+    void initialize();
+
+    /**
+     * Shuts down the service after the test has completed
+     */
+    void shutdown();
+
+    @Override
+    default void afterAll(ExtensionContext extensionContext) throws Exception {
+        shutdown();
+    }
+
+    @Override
+    default void beforeAll(ExtensionContext extensionContext) throws Exception {
+        initialize();
+    }
+}
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseServiceFactory.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseServiceFactory.java
new file mode 100644
index 0000000..4a0c63c
--- /dev/null
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseServiceFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.couchbase.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class CouchbaseServiceFactory {
+    private static final Logger LOG = LoggerFactory.getLogger(CouchbaseServiceFactory.class);
+
+    private CouchbaseServiceFactory() {
+
+    }
+
+    public static CouchbaseService getService() {
+        String instanceType = System.getProperty("couchbase.instance.type");
+
+        if (instanceType == null || instanceType.equals("local-couchbase-instance")) {
+            return new CouchbaseLocalContainerService();
+        }
+
+        if (instanceType.equals("remote")) {
+            return new CouchbaseRemoteService();
+        }
+
+        LOG.error("Couchbase instance must be one of 'local-couchbase-container' or 'remote");
+        throw new UnsupportedOperationException("Invalid Couchbase instance type");
+    }
+
+}
+
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelCouchbasePropertyFactory.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelCouchbasePropertyFactory.java
new file mode 100644
index 0000000..dd0e9d9
--- /dev/null
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelCouchbasePropertyFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.couchbase.sink;
+
+import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+
+public class CamelCouchbasePropertyFactory extends SinkConnectorPropertyFactory<CamelCouchbasePropertyFactory> {
+
+    public CamelCouchbasePropertyFactory withProtocol(String value) {
+        return setProperty("camel.sink.path.protocol", value);
+    }
+
+    public CamelCouchbasePropertyFactory withHostname(String value) {
+        return setProperty("camel.sink.path.hostname", value);
+    }
+
+    public CamelCouchbasePropertyFactory withPort(int value) {
+        return setProperty("camel.sink.path.port", value);
+    }
+
+    public CamelCouchbasePropertyFactory withBucket(String value) {
+        return setProperty("camel.sink.endpoint.bucket", value);
+    }
+
+    public CamelCouchbasePropertyFactory withCollection(String value) {
+        return setProperty("camel.sink.endpoint.collection", value);
+    }
+
+    public CamelCouchbasePropertyFactory withOperation(String value) {
+        return setProperty("camel.sink.endpoint.operation", value);
+    }
+
+    public CamelCouchbasePropertyFactory withUsername(String value) {
+        return setProperty("camel.sink.endpoint.username", value);
+    }
+
+    public CamelCouchbasePropertyFactory withPassword(String value) {
+        return setProperty("camel.sink.endpoint.password", value);
+    }
+
+    public EndpointUrlBuilder<CamelCouchbasePropertyFactory> withUrl(String protocol, String hostname, int port, String bucket) {
+        String sinkUrl = String.format("couchbase:%s://%s:%d/%s", protocol, hostname, port, bucket);
+
+        return new EndpointUrlBuilder<>(this::withSinkUrl, sinkUrl);
+    }
+
+    public static CamelCouchbasePropertyFactory basic() {
+        return new CamelCouchbasePropertyFactory()
+                    .withTasksMax(1)
+                    .withName("CamelCouchbaseSinkConnector")
+                    .withConnectorClass("org.apache.camel.kafkaconnector.couchbase.CamelCouchbaseSinkConnector")
+                    .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                    .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
new file mode 100644
index 0000000..fd51530
--- /dev/null
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.couchbase.sink;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import com.couchbase.client.java.Cluster;
+import com.couchbase.client.java.json.JsonObject;
+import com.couchbase.client.java.manager.bucket.BucketSettings;
+import com.couchbase.client.java.query.QueryResult;
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.kafkaconnector.couchbase.services.CouchbaseService;
+import org.apache.camel.kafkaconnector.couchbase.services.CouchbaseServiceFactory;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
+    @RegisterExtension
+    public static CouchbaseService service = CouchbaseServiceFactory.getService();
+
+    private static final Logger LOG = LoggerFactory.getLogger(CamelSinkCouchbaseITCase.class);
+
+    private String bucketName;
+    private String topic;
+
+    private Cluster cluster;
+
+    private final int expect = 10;
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-couchbase-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        bucketName = "testBucket" + TestUtils.randomWithRange(0, 100);
+        cluster = Cluster.connect(service.getConnectionString(), service.getUsername(), service.getPassword());
+
+        LOG.debug("Creating a new bucket named {}", bucketName);
+        cluster.buckets().createBucket(BucketSettings.create(bucketName));
+        LOG.debug("Bucket created");
+
+        topic = TestUtils.getDefaultTestTopic(this.getClass());
+    }
+
+    @AfterEach
+    public void tearDown() {
+        LOG.debug("Dropping the test bucket named {}", bucketName);
+        cluster.buckets().dropBucket(bucketName);
+        LOG.debug("Bucket dropped");
+
+        cluster.disconnect();
+    }
+
+    private void produceMessages(CountDownLatch latch) {
+        KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
+
+        try {
+            for (int i = 0; i < expect; i++) {
+                Map<String, String> parameters = new HashMap<>();
+
+                parameters.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CCB_ID", String.valueOf(i));
+
+                JsonObject jsonObject = JsonObject.create().put("data", String.format("test-%d", i));
+
+                try {
+                    kafkaClient.produce(topic, jsonObject.toString(), parameters);
+                } catch (ExecutionException e) {
+                    LOG.error("Unable to produce messages: {}", e.getMessage(), e);
+                } catch (InterruptedException e) {
+                    break;
+                }
+            }
+        } finally {
+            latch.countDown();
+        }
+    }
+
+    private boolean waitForMinimumRecordCount() {
+        try {
+            String query = String.format("select count(*) as count from `%s`", bucketName);
+            QueryResult queryResult = cluster.query(query);
+            List<JsonObject> results = queryResult.rowsAsObject();
+
+            if (results.isEmpty()) {
+                return false;
+            }
+
+            int size = results.get(0).getInt("count");
+            if (size < expect) {
+                LOG.info("There are only {} records at the moment", size);
+
+                return false;
+            }
+
+            return size == expect;
+        } catch (Exception e) {
+            LOG.warn("Exception while waiting for the records to arrive: {}", e.getMessage(), e);
+        }
+
+        return false;
+    }
+
+    private void verifyRecords() {
+        String query = String.format("select * from `%s` USE KEYS \"1\"", bucketName);
+        QueryResult queryResult = cluster.query(query);
+
+        List<JsonObject> results = queryResult.rowsAsObject();
+
+        assertFalse(results.isEmpty(), "There should be at least 1 record on the result");
+        LOG.debug("Received record: {}", results.get(0));
+    }
+
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception {
+        connectorPropertyFactory.log();
+        getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
+
+        LOG.debug("Creating the producer and sending messages ...");
+        ExecutorService service = Executors.newCachedThreadPool();
+
+        CountDownLatch latch = new CountDownLatch(1);
+        service.submit(() -> produceMessages(latch));
+
+        assertTrue(TestUtils.waitFor(this::waitForMinimumRecordCount));
+
+        LOG.debug("Waiting for the test to complete");
+        if (latch.await(110, TimeUnit.SECONDS)) {
+            verifyRecords();
+        } else {
+            fail("Failed to receive the records within the specified time");
+        }
+    }
+
+    @Disabled("Not formatting the URL correctly - issue #629")
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceive() throws Exception {
+        ConnectorPropertyFactory factory = CamelCouchbasePropertyFactory.basic()
+                .withTopics(topic)
+                .withBucket(bucketName)
+                .withProtocol("http")
+                .withHostname(service.getHostname())
+                .withPort(service.getPort())
+                .withUsername(service.getUsername())
+                .withPassword(service.getPassword());
+
+        runTest(factory);
+    }
+
+    @Test
+    @Timeout(90)
+    public void testBasicSendReceiveUsingUrl() throws Exception {
+        ConnectorPropertyFactory factory = CamelCouchbasePropertyFactory.basic()
+                .withTopics(topic)
+                .withUrl("http", service.getHostname(), service.getPort(), bucketName)
+                    .append("username", service.getUsername())
+                    .append("password", service.getPassword())
+                    .buildUrl();
+
+
+        runTest(factory);
+    }
+}
diff --git a/tests/pom.xml b/tests/pom.xml
index dea3592..e2568c6 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -61,6 +61,7 @@
         <module>itests-azure-storage-queue</module>
         <module>perf-tests-rabbitmq</module>
         <module>itests-rabbitmq</module>
+        <module>itests-couchbase</module>
     </modules>