You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/03/30 10:56:31 UTC
[camel-kafka-connector] 02/02: chore: test flakyness mitigation.
This is an automated email from the ASF dual-hosted git repository.
valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 923b4be9c94b9d21f826becbe6619e2f16828931
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Tue Mar 30 11:47:25 2021 +0200
chore: test flakyness mitigation.
---
.../cxf/sink/CamelSinkCXFITCase.java | 1 +
.../cxf/source/CamelSourceCXFITCase.java | 6 +-
.../cxfrs/source/CamelSourceCXFRSITCase.java | 81 +++-------------------
3 files changed, 13 insertions(+), 75 deletions(-)
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
index 6690341..04364b9 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
@@ -100,6 +100,7 @@ public class CamelSinkCXFITCase extends CamelSinkTestSupport {
}
@Test
+ @Timeout(90)
public void testBasicSendReceiveUsingUrl() throws Exception {
InputStream stream = this.getClass().getResource("/hello-service-test.xml").openStream();
String testMessage = IOUtils.toString(stream, Charset.defaultCharset());
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
index b9f04c2..20e6a3d 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
@@ -91,7 +91,7 @@ public class CamelSourceCXFITCase extends CamelSourceTestSupport {
@Test
- @Timeout(20)
+ @Timeout(30)
public void testBasicSendReceive() throws ExecutionException, InterruptedException {
String topicName = getTopicForTest(this);
@@ -105,7 +105,7 @@ public class CamelSourceCXFITCase extends CamelSourceTestSupport {
}
@Test
- @Timeout(20)
+ @Timeout(30)
public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException {
String topicName = getTopicForTest(this);
@@ -119,7 +119,7 @@ public class CamelSourceCXFITCase extends CamelSourceTestSupport {
}
@Test
- @Timeout(20)
+ @Timeout(30)
public void testBasicSendReceiveUsingDataFormat() throws ExecutionException, InterruptedException {
String topicName = getTopicForTest(this);
diff --git a/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/CamelSourceCXFRSITCase.java b/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/CamelSourceCXFRSITCase.java
index e7372fd..c51f6f9 100644
--- a/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/CamelSourceCXFRSITCase.java
+++ b/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/CamelSourceCXFRSITCase.java
@@ -46,26 +46,19 @@ import static org.junit.jupiter.api.Assertions.fail;
* messages
*/
public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
-
+
protected static final String LOCALHOST = NetworkUtils.getHostname();
- protected static final int PORT = NetworkUtils.getFreePort(LOCALHOST);
+ protected static final int PORT = NetworkUtils.getFreePort(LOCALHOST, 20000, 30000);
protected static final String CXT = PORT + "/CxfRsConsumerTest";
protected static final String CXF_RS_ENDPOINT_ADDRESS = "http://" + LOCALHOST + ":" + CXT + "/rest";
- protected static final String CXF_RS_ENDPOINT_URI = CXF_RS_ENDPOINT_ADDRESS
- + "?resourceClasses=org.apache.camel.kafkaconnector.cxfrs.CustomerServiceResource";
-
-
private static String[] receivedValue = {"[126]", "[123]", "[400]"};
-
private static final Logger LOG = LoggerFactory.getLogger(CamelSourceCXFRSITCase.class);
private int received;
private final int expect = 3;
-
-
@Override
protected String[] getConnectorsInTest() {
return new String[] {"camel-cxfrs-kafka-connector"};
@@ -74,18 +67,14 @@ public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
@BeforeEach
public void setUp() {
received = 0;
-
}
-
-
@Override
protected void produceTestData() {
TestUtils.waitFor(() -> NetworkUtils.portIsOpen(LOCALHOST, PORT));
-
try {
Bus bus = BusFactory.newInstance().createBus();
-
+
bus.getInInterceptors().add(new LoggingInInterceptor());
bus.getOutInterceptors().add(new LoggingOutInterceptor());
try {
@@ -93,8 +82,6 @@ public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
} catch (Exception e) {
LOG.info("Test Invocation Failure", e);
}
-
-
} catch (Exception e) {
LOG.info("Unable to invoke service: {}", e.getMessage(), e);
fail("Unable to invoke service");
@@ -104,24 +91,19 @@ public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
@Override
protected void verifyMessages(TestMessageConsumer<?> consumer) {
LOG.info("Consumed messages: {}", consumer.consumedMessages());
-
+
for (ConsumerRecord<String, ?> record : consumer.consumedMessages()) {
Object receivedObject = consumer.consumedMessages().get(received).value();
if (!(receivedObject instanceof String)) {
fail("Unexpected message type");
}
-
String result = (String) receivedObject;
assertEquals(receivedValue[received++], result);
-
-
}
}
-
-
@Test
- @Timeout(20)
+ @Timeout(30)
public void testBasicSendReceive() {
try {
String topicName = getTopicForTest(this);
@@ -130,8 +112,6 @@ public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
.withKafkaTopic(topicName)
.withAddress(CXF_RS_ENDPOINT_ADDRESS)
.withResourceClass("org.apache.camel.kafkaconnector.cxfrs.CustomerServiceResource");
-
-
runTestBlocking(connectorPropertyFactory, topicName, expect);
} catch (Exception e) {
@@ -139,49 +119,7 @@ public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
fail(e.getMessage());
}
}
-
- @Test
- @Timeout(20)
- public void testBasicSendReceiveWithoutProcessor() {
- try {
-
- String topicName = getTopicForTest(this);
- ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFRSPropertyFactory
- .basic()
- .withKafkaTopic(topicName)
- .withAddress(CXF_RS_ENDPOINT_ADDRESS)
- .withResourceClass("org.apache.camel.kafkaconnector.cxfrs.CustomerServiceResource");
-
-
- runTestBlocking(connectorPropertyFactory, topicName, expect);
- } catch (Exception e) {
- LOG.error("CXF test failed: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
- }
-
- @Test
- @Timeout(20)
- public void testBasicSendReceiveUsingUrl() {
- try {
- String topicName = getTopicForTest(this);
- ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFRSPropertyFactory
- .basic()
- .withKafkaTopic(topicName)
- .withUrl(CXF_RS_ENDPOINT_URI).buildUrl();
-
-
-
- runTestBlocking(connectorPropertyFactory, topicName, expect);
- } catch (Exception e) {
- LOG.error("CXF test failed: {}", e.getMessage(), e);
- fail(e.getMessage());
- }
- }
-
-
-
private void invokeGetCustomer(String uri, String expect) throws Exception {
HttpGet get = new HttpGet(uri);
get.addHeader("Accept", "application/json");
@@ -189,7 +127,7 @@ public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
try {
HttpResponse response = httpclient.execute(get);
-
+
} finally {
httpclient.close();
}
@@ -197,12 +135,11 @@ public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
private void doTestGetCustomer(String contextUri) throws Exception {
invokeGetCustomer("http://" + LOCALHOST + ":" + CXT + "/" + contextUri + "/customerservice/customers/126",
- "{\"Customer\":{\"id\":126,\"name\":\"CKC\"}}");
+ "{\"Customer\":{\"id\":126,\"name\":\"CKC\"}}");
invokeGetCustomer("http://" + LOCALHOST + ":" + CXT + "/" + contextUri + "/customerservice/customers/123",
- "customer response back!");
+ "customer response back!");
invokeGetCustomer("http://" + LOCALHOST + ":" + CXT + "/" + contextUri + "/customerservice/customers/400",
- "The remoteAddress is 127.0.0.1");
+ "The remoteAddress is 127.0.0.1");
}
-
}