You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/08/10 16:26:43 UTC

[camel] 05/06: (chores) camel-kafka: ensure tests run a little bit faster

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch camel-3.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 28e1abba16c52b0b8d234ccc92c18110ca96e04a
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Thu Aug 4 09:12:38 2022 +0200

    (chores) camel-kafka: ensure tests run a little bit faster
    
    This ensures that the polls happen multiple times within the test execution window and also that the session timeout does not exceed it.
---
 .../processor/resume/kafka/MultiNodeKafkaResumeStrategy.java  | 11 -----------
 .../kafka/integration/KafkaConsumerAsyncManualCommitIT.java   |  2 +-
 .../kafka/integration/KafkaConsumerBadPortHealthCheckIT.java  |  2 +-
 .../KafkaConsumerBadPortSupervisingHealthCheckIT.java         |  2 +-
 .../component/kafka/integration/KafkaConsumerFullIT.java      |  2 +-
 .../kafka/integration/KafkaConsumerHealthCheckIT.java         |  2 +-
 .../kafka/integration/KafkaConsumerIdempotentIT.java          |  2 +-
 .../KafkaConsumerIdempotentWithCustomSerializerIT.java        |  2 +-
 .../integration/KafkaConsumerIdempotentWithProcessorIT.java   |  2 +-
 .../kafka/integration/KafkaConsumerTopicIsPatternIT.java      |  5 +++--
 .../integration/KafkaConsumerUnresolvableHealthCheckIT.java   |  2 +-
 .../kafka/integration/commit/KafkaConsumerNoopCommitIT.java   |  2 +-
 .../kafka/integration/commit/KafkaConsumerSyncCommitIT.java   |  2 +-
 .../pause/KafkaPausableConsumerCircuitBreakerIT.java          |  2 +-
 .../kafka/integration/pause/KafkaPausableConsumerIT.java      |  2 +-
 15 files changed, 16 insertions(+), 26 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
index 82580200f03..fd76c7a6046 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/resume/kafka/MultiNodeKafkaResumeStrategy.java
@@ -17,20 +17,10 @@
 
 package org.apache.camel.processor.resume.kafka;
 
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.Properties;
-import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import org.apache.camel.resume.Deserializable;
 import org.apache.camel.resume.Resumable;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,5 +57,4 @@ public class MultiNodeKafkaResumeStrategy<K extends Resumable> extends SingleNod
         super(resumeStrategyConfiguration, executorService);
     }
 
-
 }
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
index bfda5a5a5e8..6b94fb6454a 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
@@ -55,7 +55,7 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
     private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerAsyncManualCommitIT.class);
 
     @EndpointInject("kafka:" + TOPIC
-                    + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+                    + "?groupId=KafkaConsumerAsyncManualCommitIT&pollTimeoutMs=1000&autoCommitEnable=false"
                     + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#testFactory")
     private Endpoint from;
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java
index 0ca75d0b2b4..c9a8402cf66 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortHealthCheckIT.java
@@ -72,7 +72,7 @@ public class KafkaConsumerBadPortHealthCheckIT extends CamelTestSupport {
     @EndpointInject("kafka:" + TOPIC
                     + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
                     + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
     @EndpointInject("mock:result")
     private MockEndpoint to;
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java
index 31727cae3fd..d210fe298ad 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerBadPortSupervisingHealthCheckIT.java
@@ -74,7 +74,7 @@ public class KafkaConsumerBadPortSupervisingHealthCheckIT extends CamelTestSuppo
     @EndpointInject("kafka:" + TOPIC
                     + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
                     + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+                    + "&autoCommitIntervalMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
     @EndpointInject("mock:result")
     private MockEndpoint to;
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
index 46e304d0613..9b857342181 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerFullIT.java
@@ -59,7 +59,7 @@ public class KafkaConsumerFullIT extends BaseEmbeddedKafkaTestSupport {
     @EndpointInject("kafka:" + TOPIC
                     + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
                     + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
 
     @EndpointInject("mock:result")
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
index bfac711c265..74fbc8fdbf6 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
@@ -79,7 +79,7 @@ public class KafkaConsumerHealthCheckIT extends CamelTestSupport {
     @EndpointInject("kafka:" + TOPIC
                     + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
                     + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
     @EndpointInject("mock:result")
     private MockEndpoint to;
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
index 6ad1dba93ce..36de458fcfe 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentIT.java
@@ -45,7 +45,7 @@ public class KafkaConsumerIdempotentIT extends KafkaConsumerIdempotentTestSuppor
                     + "?groupId=group2&autoOffsetReset=earliest"
                     + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
                     + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true"
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true"
                     + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
index 2e80a8d23b7..2095aafdc0f 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithCustomSerializerIT.java
@@ -41,7 +41,7 @@ public class KafkaConsumerIdempotentWithCustomSerializerIT extends KafkaConsumer
                     + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
                     + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
                     + "&headerDeserializer=#class:org.apache.camel.component.kafka.integration.CustomHeaderDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true"
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true"
                     + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
index 4a5f9ff685f..d68eddfa296 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerIdempotentWithProcessorIT.java
@@ -40,7 +40,7 @@ public class KafkaConsumerIdempotentWithProcessorIT extends KafkaConsumerIdempot
                     + "?groupId=group2&autoOffsetReset=earliest"
                     + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
                     + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true"
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true"
                     + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
index a178095e280..bbae0f74910 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerTopicIsPatternIT.java
@@ -38,8 +38,9 @@ public class KafkaConsumerTopicIsPatternIT extends BaseEmbeddedKafkaTestSupport
     public static final String TOPIC = "vess123d";
     public static final String TOPIC_PATTERN = "v.*d";
 
-    @EndpointInject("kafka:" + TOPIC_PATTERN + "?topicIsPattern=true&groupId=group1&autoOffsetReset=earliest"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor&metadataMaxAgeMs=1000")
+    @EndpointInject("kafka:" + TOPIC_PATTERN
+                    + "?topicIsPattern=true&groupId=KafkaConsumerTopicIsPatternIT&autoOffsetReset=earliest"
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor&metadataMaxAgeMs=1000")
     private Endpoint from;
 
     @EndpointInject("mock:result")
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java
index c0395ec7f6c..5f34c34997c 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerUnresolvableHealthCheckIT.java
@@ -72,7 +72,7 @@ public class KafkaConsumerUnresolvableHealthCheckIT extends CamelTestSupport {
     @EndpointInject("kafka:" + TOPIC
                     + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
                     + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
     @EndpointInject("mock:result")
     private MockEndpoint to;
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java
index a1daccb40df..d69387a6460 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerNoopCommitIT.java
@@ -35,7 +35,7 @@ public class KafkaConsumerNoopCommitIT extends BaseManualCommitTestSupport {
     public static final String TOPIC = "testManualNoopCommitTest";
 
     @EndpointInject("kafka:" + TOPIC
-                    + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+                    + "?groupId=KafkaConsumerNoopCommitIT&pollTimeoutMs=1000&autoCommitEnable=false"
                     + "&allowManualCommit=true&autoOffsetReset=earliest&metadataMaxAgeMs=1000")
     private Endpoint from;
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java
index a6e4cddb173..e9967641009 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/commit/KafkaConsumerSyncCommitIT.java
@@ -34,7 +34,7 @@ public class KafkaConsumerSyncCommitIT extends BaseManualCommitTestSupport {
     public static final String TOPIC = "testManualCommitSyncTest";
 
     @EndpointInject("kafka:" + TOPIC
-                    + "?groupId=group1&sessionTimeoutMs=30000&autoCommitEnable=false"
+                    + "?groupId=KafkaConsumerSyncCommitIT&pollTimeoutMs=1000&autoCommitEnable=false"
                     + "&allowManualCommit=true&autoOffsetReset=earliest&kafkaManualCommitFactory=#class:org.apache.camel.component.kafka.consumer.DefaultKafkaManualCommitFactory")
     private Endpoint from;
 
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java
index 89501aa7c5e..1b813178602 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerCircuitBreakerIT.java
@@ -63,7 +63,7 @@ public class KafkaPausableConsumerCircuitBreakerIT extends BaseEmbeddedKafkaTest
     @EndpointInject("kafka:" + SOURCE_TOPIC
                     + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
                     + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
 
     @EndpointInject("direct:intermediate")
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
index 369bd194e9f..a30f21a252b 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/pause/KafkaPausableConsumerIT.java
@@ -81,7 +81,7 @@ public class KafkaPausableConsumerIT extends BaseEmbeddedKafkaTestSupport {
     @EndpointInject("kafka:" + SOURCE_TOPIC
                     + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
                     + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-                    + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+                    + "&autoCommitIntervalMs=1000&pollTimeoutMs=1000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
     private Endpoint from;
 
     @EndpointInject("direct:intermediate")