You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by va...@apache.org on 2021/03/30 10:56:31 UTC

[camel-kafka-connector] 02/02: chore: test flakyness mitigation.

This is an automated email from the ASF dual-hosted git repository.

valdar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 923b4be9c94b9d21f826becbe6619e2f16828931
Author: Andrea Tarocchi <an...@gmail.com>
AuthorDate: Tue Mar 30 11:47:25 2021 +0200

    chore: test flakyness mitigation.
---
 .../cxf/sink/CamelSinkCXFITCase.java               |  1 +
 .../cxf/source/CamelSourceCXFITCase.java           |  6 +-
 .../cxfrs/source/CamelSourceCXFRSITCase.java       | 81 +++-------------------
 3 files changed, 13 insertions(+), 75 deletions(-)

diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
index 6690341..04364b9 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/sink/CamelSinkCXFITCase.java
@@ -100,6 +100,7 @@ public class CamelSinkCXFITCase extends CamelSinkTestSupport {
     }
 
     @Test
+    @Timeout(90)
     public void testBasicSendReceiveUsingUrl() throws Exception {
         InputStream stream = this.getClass().getResource("/hello-service-test.xml").openStream();
         String testMessage = IOUtils.toString(stream, Charset.defaultCharset());
diff --git a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
index b9f04c2..20e6a3d 100644
--- a/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
+++ b/tests/itests-cxf/src/test/java/org/apache/camel/kafkaconnector/cxf/source/CamelSourceCXFITCase.java
@@ -91,7 +91,7 @@ public class CamelSourceCXFITCase extends CamelSourceTestSupport {
 
 
     @Test
-    @Timeout(20)
+    @Timeout(30)
     public void testBasicSendReceive() throws ExecutionException, InterruptedException {
         String topicName = getTopicForTest(this);
 
@@ -105,7 +105,7 @@ public class CamelSourceCXFITCase extends CamelSourceTestSupport {
     }
 
     @Test
-    @Timeout(20)
+    @Timeout(30)
     public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException {
         String topicName = getTopicForTest(this);
 
@@ -119,7 +119,7 @@ public class CamelSourceCXFITCase extends CamelSourceTestSupport {
     }
 
     @Test
-    @Timeout(20)
+    @Timeout(30)
     public void testBasicSendReceiveUsingDataFormat() throws ExecutionException, InterruptedException {
         String topicName = getTopicForTest(this);
 
diff --git a/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/CamelSourceCXFRSITCase.java b/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/CamelSourceCXFRSITCase.java
index e7372fd..c51f6f9 100644
--- a/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/CamelSourceCXFRSITCase.java
+++ b/tests/itests-cxfrs/src/test/java/org/apache/camel/kafkaconnector/cxfrs/source/CamelSourceCXFRSITCase.java
@@ -46,26 +46,19 @@ import static org.junit.jupiter.api.Assertions.fail;
  * messages
  */
 public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
-    
+
     protected static final String LOCALHOST = NetworkUtils.getHostname();
-    protected static final int PORT = NetworkUtils.getFreePort(LOCALHOST);
+    protected static final int PORT = NetworkUtils.getFreePort(LOCALHOST, 20000, 30000);
     protected static final String CXT = PORT + "/CxfRsConsumerTest";
     protected static final String CXF_RS_ENDPOINT_ADDRESS = "http://" + LOCALHOST + ":" + CXT + "/rest";
-    protected static final String CXF_RS_ENDPOINT_URI =  CXF_RS_ENDPOINT_ADDRESS
-        + "?resourceClasses=org.apache.camel.kafkaconnector.cxfrs.CustomerServiceResource";
 
-    
-            
     private static String[] receivedValue = {"[126]", "[123]", "[400]"};
 
-    
     private static final Logger LOG = LoggerFactory.getLogger(CamelSourceCXFRSITCase.class);
 
     private int received;
     private final int expect = 3;
-    
 
-    
     @Override
     protected String[] getConnectorsInTest() {
         return new String[] {"camel-cxfrs-kafka-connector"};
@@ -74,18 +67,14 @@ public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
     @BeforeEach
     public void setUp() {
         received = 0;
-        
     }
 
-    
-
     @Override
     protected void produceTestData() {
         TestUtils.waitFor(() -> NetworkUtils.portIsOpen(LOCALHOST, PORT));
-
         try {
             Bus bus = BusFactory.newInstance().createBus();
-            
+
             bus.getInInterceptors().add(new LoggingInInterceptor());
             bus.getOutInterceptors().add(new LoggingOutInterceptor());
             try {
@@ -93,8 +82,6 @@ public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
             } catch (Exception e) {
                 LOG.info("Test Invocation Failure", e);
             }
-            
-
         } catch (Exception e) {
             LOG.info("Unable to invoke service: {}", e.getMessage(), e);
             fail("Unable to invoke service");
@@ -104,24 +91,19 @@ public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
     @Override
     protected void verifyMessages(TestMessageConsumer<?> consumer) {
         LOG.info("Consumed messages: {}", consumer.consumedMessages());
-        
+
         for (ConsumerRecord<String, ?> record : consumer.consumedMessages()) {
             Object receivedObject = consumer.consumedMessages().get(received).value();
             if (!(receivedObject instanceof String)) {
                 fail("Unexpected message type");
             }
-
             String result = (String) receivedObject;
             assertEquals(receivedValue[received++], result);
-            
-            
         }
     }
 
-        
-
     @Test
-    @Timeout(20)
+    @Timeout(30)
     public void testBasicSendReceive() {
         try {
             String topicName = getTopicForTest(this);
@@ -130,8 +112,6 @@ public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
                     .withKafkaTopic(topicName)
                     .withAddress(CXF_RS_ENDPOINT_ADDRESS)
                     .withResourceClass("org.apache.camel.kafkaconnector.cxfrs.CustomerServiceResource");
-                  
-                                        
 
             runTestBlocking(connectorPropertyFactory, topicName, expect);
         } catch (Exception e) {
@@ -139,49 +119,7 @@ public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
             fail(e.getMessage());
         }
     }
-    
-    @Test
-    @Timeout(20)
-    public void testBasicSendReceiveWithoutProcessor() {
-        try {
-            
-            String topicName = getTopicForTest(this);
-            ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFRSPropertyFactory
-                    .basic()
-                    .withKafkaTopic(topicName)
-                    .withAddress(CXF_RS_ENDPOINT_ADDRESS)
-                    .withResourceClass("org.apache.camel.kafkaconnector.cxfrs.CustomerServiceResource");
-                    
-                                        
 
-            runTestBlocking(connectorPropertyFactory, topicName, expect);
-        } catch (Exception e) {
-            LOG.error("CXF test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
-    }
-    
-    @Test
-    @Timeout(20)
-    public void testBasicSendReceiveUsingUrl() {
-        try {
-            String topicName = getTopicForTest(this);
-            ConnectorPropertyFactory connectorPropertyFactory = CamelSourceCXFRSPropertyFactory
-                    .basic()
-                    .withKafkaTopic(topicName)
-                    .withUrl(CXF_RS_ENDPOINT_URI).buildUrl();
-                    
-                    
-
-            runTestBlocking(connectorPropertyFactory, topicName, expect);
-        } catch (Exception e) {
-            LOG.error("CXF test failed: {}", e.getMessage(), e);
-            fail(e.getMessage());
-        }
-    }
-
-    
-    
     private void invokeGetCustomer(String uri, String expect) throws Exception {
         HttpGet get = new HttpGet(uri);
         get.addHeader("Accept", "application/json");
@@ -189,7 +127,7 @@ public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
 
         try {
             HttpResponse response = httpclient.execute(get);
-            
+
         } finally {
             httpclient.close();
         }
@@ -197,12 +135,11 @@ public class CamelSourceCXFRSITCase extends CamelSourceTestSupport {
 
     private void doTestGetCustomer(String contextUri) throws Exception {
         invokeGetCustomer("http://" + LOCALHOST + ":" + CXT + "/" + contextUri + "/customerservice/customers/126",
-                          "{\"Customer\":{\"id\":126,\"name\":\"CKC\"}}");
+                "{\"Customer\":{\"id\":126,\"name\":\"CKC\"}}");
         invokeGetCustomer("http://" + LOCALHOST + ":" + CXT + "/" + contextUri + "/customerservice/customers/123",
-                          "customer response back!");
+                "customer response back!");
         invokeGetCustomer("http://" + LOCALHOST + ":" + CXT + "/" + contextUri + "/customerservice/customers/400",
-            "The remoteAddress is 127.0.0.1");
+                "The remoteAddress is 127.0.0.1");
 
     }
-    
 }