You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2020/12/15 11:51:29 UTC
[camel-kafka-connector] 02/02: Fixed the timeout issues on
Couchbase Sink test with URL
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch camel-master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit adbda2fee8bf3ad5491b2967b60be3a9d77c331f
Author: Otavio Rodolfo Piske <op...@redhat.com>
AuthorDate: Tue Dec 15 11:08:13 2020 +0100
Fixed the timeout issues on Couchbase Sink test with URL
---
tests/itests-couchbase/pom.xml | 29 ++++++-
.../services/CouchbaseLocalContainerService.java | 89 ----------------------
.../couchbase/services/CouchbaseRemoteService.java | 58 --------------
.../couchbase/services/CouchbaseService.java | 52 -------------
.../services/CouchbaseServiceFactory.java | 46 -----------
.../couchbase/sink/CamelSinkCouchbaseITCase.java | 46 +++++++++--
6 files changed, 66 insertions(+), 254 deletions(-)
diff --git a/tests/itests-couchbase/pom.xml b/tests/itests-couchbase/pom.xml
index 7023380..0445019 100644
--- a/tests/itests-couchbase/pom.xml
+++ b/tests/itests-couchbase/pom.xml
@@ -39,13 +39,15 @@
<dependency>
<groupId>org.apache.camel</groupId>
- <artifactId>camel-couchbase</artifactId>
+ <artifactId>camel-test-infra-couchbase</artifactId>
+ <version>${camel.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>couchbase</artifactId>
- <scope>test</scope>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-couchbase</artifactId>
</dependency>
<dependency>
@@ -54,4 +56,23 @@
<scope>test</scope>
</dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <argLine>${common.failsafe.args}</argLine>
+ <skipTests>${skipIntegrationTests}</skipTests>
+ <!--
+ These tests are flaky and depend on some fragile timeout logic on Couchbase
+ -->
+ <rerunFailingTestsCount>2</rerunFailingTestsCount>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+
</project>
\ No newline at end of file
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseLocalContainerService.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseLocalContainerService.java
deleted file mode 100644
index c4e0fbc..0000000
--- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseLocalContainerService.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.couchbase.services;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.couchbase.BucketDefinition;
-import org.testcontainers.couchbase.CouchbaseContainer;
-
-
-public class CouchbaseLocalContainerService implements CouchbaseService {
-
- /*
- * Couchbase container uses a dynamic port for the KV service. The configuration
- * used in the Camel component tries to use that port by default and it seems
- * we cannot configure it. Therefore, we override the default container and
- * force the default KV port to be used.
- */
- private class CustomCouchbaseContainer extends CouchbaseContainer {
- public CustomCouchbaseContainer() {
- final int kvPort = 11210;
- addFixedExposedPort(kvPort, kvPort);
- }
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(CouchbaseLocalContainerService.class);
- private BucketDefinition bucketDefinition = new BucketDefinition("mybucket");
- private CouchbaseContainer container;
-
- public CouchbaseLocalContainerService() {
- container = new CustomCouchbaseContainer()
- .withBucket(bucketDefinition);
- }
-
-
- @Override
- public String getConnectionString() {
- return container.getConnectionString();
- }
-
-
- @Override
- public String getUsername() {
- return container.getUsername();
- }
-
- @Override
- public String getPassword() {
- return container.getPassword();
- }
-
- @Override
- public String getHostname() {
- return container.getHost();
- }
-
- @Override
- public int getPort() {
- return container.getBootstrapHttpDirectPort();
- }
-
- @Override
- public void initialize() {
- container.start();
-
- LOG.debug("Couchbase container running at {}", getConnectionString());
- }
-
- @Override
- public void shutdown() {
- container.stop();
- }
-}
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseRemoteService.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseRemoteService.java
deleted file mode 100644
index d720aa7..0000000
--- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseRemoteService.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.couchbase.services;
-
-public class CouchbaseRemoteService implements CouchbaseService {
- @Override
- public String getConnectionString() {
- final int kvPort = 11210;
- return String.format("couchbase://%s:%d", getHostname(), kvPort);
- }
-
- @Override
- public String getUsername() {
- return System.getProperty("couchbase.username", "Administrator");
- }
-
- @Override
- public String getPassword() {
- return System.getProperty("couchbase.password");
- }
-
- @Override
- public String getHostname() {
- return System.getProperty("couchbase.hostname");
- }
-
- @Override
- public int getPort() {
- String portValue = System.getProperty("couchbase.port", "8091");
-
- return Integer.parseInt(portValue);
- }
-
- @Override
- public void initialize() {
-
- }
-
- @Override
- public void shutdown() {
-
- }
-}
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseService.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseService.java
deleted file mode 100644
index 2dcd477..0000000
--- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseService.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.couchbase.services;
-
-import org.junit.jupiter.api.extension.AfterAllCallback;
-import org.junit.jupiter.api.extension.BeforeAllCallback;
-import org.junit.jupiter.api.extension.ExtensionContext;
-
-public interface CouchbaseService extends BeforeAllCallback, AfterAllCallback {
-
- String getConnectionString();
- String getUsername();
- String getPassword();
- String getHostname();
- int getPort();
-
-
- /**
- * Perform any initialization necessary
- */
- void initialize();
-
- /**
- * Shuts down the service after the test has completed
- */
- void shutdown();
-
- @Override
- default void afterAll(ExtensionContext extensionContext) throws Exception {
- shutdown();
- }
-
- @Override
- default void beforeAll(ExtensionContext extensionContext) throws Exception {
- initialize();
- }
-}
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseServiceFactory.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseServiceFactory.java
deleted file mode 100644
index 4a0c63c..0000000
--- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/services/CouchbaseServiceFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.camel.kafkaconnector.couchbase.services;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public final class CouchbaseServiceFactory {
- private static final Logger LOG = LoggerFactory.getLogger(CouchbaseServiceFactory.class);
-
- private CouchbaseServiceFactory() {
-
- }
-
- public static CouchbaseService getService() {
- String instanceType = System.getProperty("couchbase.instance.type");
-
- if (instanceType == null || instanceType.equals("local-couchbase-instance")) {
- return new CouchbaseLocalContainerService();
- }
-
- if (instanceType.equals("remote")) {
- return new CouchbaseRemoteService();
- }
-
- LOG.error("Couchbase instance must be one of 'local-couchbase-container' or 'remote");
- throw new UnsupportedOperationException("Invalid Couchbase instance type");
- }
-
-}
-
diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
index 02d46c9..1de1bbc 100644
--- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
+++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java
@@ -26,6 +26,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import com.couchbase.client.core.diagnostics.EndpointPingReport;
+import com.couchbase.client.core.diagnostics.PingResult;
+import com.couchbase.client.core.diagnostics.PingState;
+import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.manager.bucket.BucketSettings;
@@ -35,13 +39,15 @@ import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.apache.camel.kafkaconnector.couchbase.services.CouchbaseService;
-import org.apache.camel.kafkaconnector.couchbase.services.CouchbaseServiceFactory;
+import org.apache.camel.test.infra.couchbase.services.CouchbaseService;
+import org.apache.camel.test.infra.couchbase.services.CouchbaseServiceFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +56,11 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+/*
+ This test is slow and flaky. It tends to fail on systems with limited resources and slow I/O. Therefore, it is
+ disabled by default.
+ */
+@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
@RegisterExtension
public static CouchbaseService service = CouchbaseServiceFactory.getService();
@@ -73,13 +84,38 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
bucketName = "testBucket" + TestUtils.randomWithRange(0, 100);
cluster = Cluster.connect(service.getConnectionString(), service.getUsername(), service.getPassword());
- cluster.ping();
+ cluster.ping().endpoints().entrySet().forEach(this::checkEndpoints);
LOG.debug("Creating a new bucket named {}", bucketName);
+
cluster.buckets().createBucket(BucketSettings.create(bucketName));
+ PingResult pingResult = cluster.bucket(bucketName).ping();
+ pingResult.endpoints().entrySet().forEach(this::checkEndpoints);
+
LOG.debug("Bucket created");
- topic = TestUtils.getDefaultTestTopic(this.getClass());
+ topic = TestUtils.getDefaultTestTopic(this.getClass()) + TestUtils.randomWithRange(0, 100);
+
+ try {
+ String startDelay = System.getProperty("couchbase.test.start.delay", "1000");
+
+ int delay = Integer.parseInt(startDelay);
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupted();
+ }
+ }
+
+ private void checkEndpoints(Map.Entry<ServiceType, List<EndpointPingReport>> entries) {
+ entries.getValue().forEach(this::checkStatus);
+ }
+
+ private void checkStatus(EndpointPingReport endpointPingReport) {
+ if (endpointPingReport.state() == PingState.OK) {
+ LOG.debug("Endpoint {} is ok", endpointPingReport.id());
+ } else {
+ LOG.warn("Endpoint {} is not OK", endpointPingReport.id());
+ }
}
@AfterEach
@@ -186,7 +222,7 @@ public class CamelSinkCouchbaseITCase extends AbstractKafkaTest {
runTest(factory);
}
- @Test
+ @RepeatedTest(10)
@Timeout(90)
public void testBasicSendReceiveUsingUrl() throws Exception {
ConnectorPropertyFactory factory = CamelCouchbasePropertyFactory.basic()