You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/12/15 11:51:29 UTC

[camel-kafka-connector] 02/02: Fixed the timeout issues on Couchbase Sink test with URL

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit adbda2fee8bf3ad5491b2967b60be3a9d77c331f
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Dec 15 11:08:13 2020 +0100

    Fixed the timeout issues on Couchbase Sink test with URL
---
 tests/itests-couchbase/pom.xml                     | 29 ++++++-
 .../services/CouchbaseLocalContainerService.java   | 89 ----------------------
 .../couchbase/services/CouchbaseRemoteService.java | 58 --------------
 .../couchbase/services/CouchbaseService.java       | 52 -------------
 .../services/CouchbaseServiceFactory.java          | 46 -----------
 .../couchbase/sink/CamelSinkCouchbaseITCase.java   | 46 +++++++++--
 6 files changed, 66 insertions(+), 254 deletions(-)

diff --git a/tests/itests-couchbase/pom.xml b/tests/itests-couchbase/pom.xml
index 7023380..0445019 100644
--- a/tests/itests-couchbase/pom.xml
+++ b/tests/itests-couchbase/pom.xml
@@ -39,13 +39,15 @@
 
         <dependency>
             <groupId>org.apache.camel</groupId>
-            <artifactId>camel-couchbase</artifactId>
+            <artifactId>camel-test-infra-couchbase</artifactId>
+            <version>${camel.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
         </dependency>
 
         <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>couchbase</artifactId>
-            <scope>test</scope>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-couchbase</artifactId>
         </dependency>
 
         <dependency>
@@ -54,4 +56,23 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <configuration>
+                    <argLine>${common.failsafe.args}</argLine>
+                    <skipTests>${skipIntegrationTests}</skipTests>
+                    <!--
+                    These tests are flaky and depend on some fragile timeout logic on Couchbase
+                     -->
+                    <rerunFailingTestsCount>2</rerunFailingTestsCount>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+
 </project>
\ No newline at end of file
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseLocalContainerService.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseLocalContainerService.java
deleted file mode 100644
index c4e0fbc..0000000
--- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseLocalContainerService.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.couchbase.services;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.couchbase.BucketDefinition;
-import org.testcontainers.couchbase.CouchbaseContainer;
-
-
-public class CouchbaseLocalContainerService implements CouchbaseService {
-
-    /*
-     * Couchbase container uses a dynamic port for the KV service. The configuration
-     * used in the Camel component tries to use that port by default and it seems
-     * we cannot configure it. Therefore, we override the default container and
-     * force the default KV port to be used.
-     */
-    private class CustomCouchbaseContainer extends CouchbaseContainer {
-        public CustomCouchbaseContainer() {
-            final int kvPort = 11210;
-            addFixedExposedPort(kvPort, kvPort);
-        }
-    }
-
-    private static final Logger LOG = LoggerFactory.getLogger(CouchbaseLocalContainerService.class);
-    private BucketDefinition bucketDefinition = new BucketDefinition("mybucket");
-    private CouchbaseContainer container;
-
-    public CouchbaseLocalContainerService() {
-        container = new CustomCouchbaseContainer()
-                .withBucket(bucketDefinition);
-    }
-
-
-    @Override
-    public String getConnectionString() {
-        return container.getConnectionString();
-    }
-
-
-    @Override
-    public String getUsername() {
-        return container.getUsername();
-    }
-
-    @Override
-    public String getPassword() {
-        return container.getPassword();
-    }
-
-    @Override
-    public String getHostname() {
-        return container.getHost();
-    }
-
-    @Override
-    public int getPort() {
-        return container.getBootstrapHttpDirectPort();
-    }
-
-    @Override
-    public void initialize() {
-        container.start();
-
-        LOG.debug("Couchbase container running at {}", getConnectionString());
-    }
-
-    @Override
-    public void shutdown() {
-        container.stop();
-    }
-}
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseRemoteService.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseRemoteService.java
deleted file mode 100644
index d720aa7..0000000
--- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseRemoteService.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.couchbase.services;
-
-public class CouchbaseRemoteService implements CouchbaseService {
-    @Override
-    public String getConnectionString() {
-        final int kvPort = 11210;
-        return String.format("couchbase://%s:%d", getHostname(), kvPort);
-    }
-
-    @Override
-    public String getUsername() {
-        return System.getProperty("couchbase.username", "Administrator");
-    }
-
-    @Override
-    public String getPassword() {
-        return System.getProperty("couchbase.password");
-    }
-
-    @Override
-    public String getHostname() {
-        return System.getProperty("couchbase.hostname");
-    }
-
-    @Override
-    public int getPort() {
-        String portValue = System.getProperty("couchbase.port", "8091");
-
-        return Integer.parseInt(portValue);
-    }
-
-    @Override
-    public void initialize() {
-
-    }
-
-    @Override
-    public void shutdown() {
-
-    }
-}
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseService.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseService.java
deleted file mode 100644
index 2dcd477..0000000
--- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.couchbase.services;
-
-import org.junit.jupiter.api.extension.AfterAllCallback;
-import org.junit.jupiter.api.extension.BeforeAllCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
-
-public interface CouchbaseService extends BeforeAllCallback, AfterAllCallback {
-
-    String getConnectionString();
-    String getUsername();
-    String getPassword();
-    String getHostname();
-    int getPort();
-
-
-    /**
-     * Perform any initialization necessary
-     */
-    void initialize();
-
-    /**
-     * Shuts down the service after the test has completed
-     */
-    void shutdown();
-
-    @Override
-    default void afterAll(ExtensionContext extensionContext) throws Exception {
-        shutdown();
-    }
-
-    @Override
-    default void beforeAll(ExtensionContext extensionContext) throws Exception {
-        initialize();
-    }
-}
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseServiceFactory.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseServiceFactory.java
deleted file mode 100644
index 4a0c63c..0000000
--- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseServiceFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.couchbase.services;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class CouchbaseServiceFactory {
-    private static final Logger LOG = LoggerFactory.getLogger(CouchbaseServiceFactory.class);
-
-    private CouchbaseServiceFactory() {
-
-    }
-
-    public static CouchbaseService getService() {
-        String instanceType = System.getProperty("couchbase.instance.type");
-
-        if (instanceType == null || instanceType.equals("local-couchbase-instance")) {
-            return new CouchbaseLocalContainerService();
-        }
-
-        if (instanceType.equals("remote")) {
-            return new CouchbaseRemoteService();
-        }
-
-        LOG.error("Couchbase instance must be one of 'local-couchbase-container' or 'remote");
-        throw new UnsupportedOperationException("Invalid Couchbase instance type");
-    }
-
-}
-
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
index 02d46c9..1de1bbc 100644
--- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
@@ -26,6 +26,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+import com.couchbase.client.core.diagnostics.EndpointPingReport;
+import com.couchbase.client.core.diagnostics.PingResult;
+import com.couchbase.client.core.diagnostics.PingState;
+import com.couchbase.client.core.service.ServiceType;
 import com.couchbase.client.java.Cluster;
 import com.couchbase.client.java.json.JsonObject;
 import com.couchbase.client.java.manager.bucket.BucketSettings;
@@ -35,13 +39,15 @@ import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.apache.camel.kafkaconnector.couchbase.services.CouchbaseService;
-import org.apache.camel.kafkaconnector.couchbase.services.CouchbaseServiceFactory;
+import org.apache.camel.test.infra.couchbase.services.CouchbaseService;
+import org.apache.camel.test.infra.couchbase.services.CouchbaseServiceFactory;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,6 +56,11 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
+/*
+ This test is slow and flaky. It tends to fail on systems with limited resources and slow I/O. Therefore, it is
+ disabled by default.
+ */
+@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
 public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
     @RegisterExtension
     public static CouchbaseService service = CouchbaseServiceFactory.getService();
@@ -73,13 +84,38 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
         bucketName = "testBucket" + TestUtils.randomWithRange(0, 100);
         cluster = Cluster.connect(service.getConnectionString(), service.getUsername(), service.getPassword());
 
-        cluster.ping();
+        cluster.ping().endpoints().entrySet().forEach(this::checkEndpoints);
 
         LOG.debug("Creating a new bucket named {}", bucketName);
+
         cluster.buckets().createBucket(BucketSettings.create(bucketName));
+        PingResult pingResult = cluster.bucket(bucketName).ping();
+        pingResult.endpoints().entrySet().forEach(this::checkEndpoints);
+
         LOG.debug("Bucket created");
 
-        topic = TestUtils.getDefaultTestTopic(this.getClass());
+        topic = TestUtils.getDefaultTestTopic(this.getClass()) + TestUtils.randomWithRange(0, 100);
+
+        try {
+            String startDelay = System.getProperty("couchbase.test.start.delay", "1000");
+
+            int delay = Integer.parseInt(startDelay);
+            Thread.sleep(delay);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupted();
+        }
+    }
+
+    private void checkEndpoints(Map.Entry<ServiceType, List<EndpointPingReport>> entries) {
+        entries.getValue().forEach(this::checkStatus);
+    }
+
+    private void checkStatus(EndpointPingReport endpointPingReport) {
+        if (endpointPingReport.state() == PingState.OK) {
+            LOG.debug("Endpoint {} is ok", endpointPingReport.id());
+        } else {
+            LOG.warn("Endpoint {} is not OK", endpointPingReport.id());
+        }
     }
 
     @AfterEach
@@ -186,7 +222,7 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
         runTest(factory);
     }
 
-    @Test
+    @RepeatedTest(10)
     @Timeout(90)
     public void testBasicSendReceiveUsingUrl() throws Exception {
         ConnectorPropertyFactory factory = CamelCouchbasePropertyFactory.basic()