You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2022/05/31 10:09:02 UTC

[camel] branch main updated (72d1ee227b2 -> cd2ef6fcf52)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from 72d1ee227b2 CAMEL-18151: camel-jbang - Export command for spring boot
     new 6a2aacc1479 (chores) camel-kafka: fix flaky manual async commit test
     new cd2ef6fcf52 (chores) camel-kafka: remove deprecated method usages

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../camel/component/kafka/KafkaProducerTest.java   |  6 ++--
 .../KafkaConsumerAsyncManualCommitIT.java          | 42 ++++++++++++++++------
 2 files changed, 35 insertions(+), 13 deletions(-)


[camel] 01/02: (chores) camel-kafka: fix flaky manual async commit test

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6a2aacc1479a8043280c405b5e6487ac69be5614
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue May 31 10:48:33 2022 +0200

    (chores) camel-kafka: fix flaky manual async commit test
---
 .../KafkaConsumerAsyncManualCommitIT.java          | 42 ++++++++++++++++------
 1 file changed, 32 insertions(+), 10 deletions(-)

diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
index f1e007482d5..bfda5a5a5e8 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerAsyncManualCommitIT.java
@@ -16,8 +16,9 @@
  */
 package org.apache.camel.component.kafka.integration;
 
-import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.BindToRegistry;
 import org.apache.camel.Endpoint;
@@ -31,15 +32,23 @@ import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommitFactory;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestMethodOrder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSupport {
     public static final String TOPIC = "testManualCommitTest";
 
@@ -74,8 +83,6 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
         if (producer != null) {
             producer.close();
         }
-        // clean all test topics
-        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
     }
 
     @Override
@@ -111,12 +118,13 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
         };
     }
 
-    @RepeatedTest(4)
-    public void kafkaManualCommit() throws Exception {
+    @DisplayName("Tests that LAST_RECORD_BEFORE_COMMIT header includes a value")
+    @Order(1)
+    @Test
+    void testLastRecordBeforeCommitHeader() {
         to.expectedMessageCount(5);
         to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
-        // The LAST_RECORD_BEFORE_COMMIT header should include a value as we use
-        // manual commit
+
         to.allMessages().header(KafkaConstants.LAST_RECORD_BEFORE_COMMIT).isNotNull();
 
         for (int k = 0; k < 5; k++) {
@@ -125,8 +133,16 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
             producer.send(data);
         }
 
-        to.assertIsSatisfied(3000);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> to.assertIsSatisfied());
+
+        List<Exchange> exchangeList = to.getExchanges();
+        assertEquals(5, exchangeList.size());
+        assertEquals(true, exchangeList.get(4).getMessage().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class));
+    }
 
+    @Order(2)
+    @Test
+    void kafkaManualCommit() throws Exception {
         to.reset();
 
         // Second step: We shut down our route, we expect nothing will be recovered by our route
@@ -141,16 +157,22 @@ public class KafkaConsumerAsyncManualCommitIT extends BaseEmbeddedKafkaTestSuppo
         }
 
         to.assertIsSatisfied(3000);
+    }
 
+    @Order(3)
+    @Test
+    void testResumeFromTheRightPoint() throws Exception {
         to.reset();
 
         // Fourth step: We start again our route, since we have been committing the offsets from the first step,
         // we will expect to consume from the latest committed offset e.g from offset 5
         context.getRouteController().startRoute("foo");
+
         to.expectedMessageCount(3);
         to.expectedBodiesReceivedInAnyOrder("message-5", "message-6", "message-7");
 
-        to.assertIsSatisfied(3000);
+        Awaitility.await().atMost(5, TimeUnit.SECONDS)
+                .untilAsserted(() -> to.assertIsSatisfied());
 
         assertEquals(0, failCount, "There should have been 0 commit failures");
     }


[camel] 02/02: (chores) camel-kafka: remove deprecated method usages

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit cd2ef6fcf520f6477e1e7eb6012b62ed666c3da1
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue May 31 11:00:37 2022 +0200

    (chores) camel-kafka: remove deprecated method usages
---
 .../java/org/apache/camel/component/kafka/KafkaProducerTest.java    | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 3261e2a459a..86cc411317a 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -87,7 +87,7 @@ public class KafkaProducerTest {
         fromEndpoint.doBuild();
         assertTrue(fromEndpoint.getKafkaClientFactory() instanceof DefaultKafkaClientFactory);
 
-        RecordMetadata rm = new RecordMetadata(null, 0, 0, 0, 0L, 0, 0);
+        RecordMetadata rm = new RecordMetadata(null, 0, 0, 0, 0, 0);
         Future future = Mockito.mock(Future.class);
         Mockito.when(future.get()).thenReturn(rm);
         org.apache.kafka.clients.producer.KafkaProducer kp
@@ -154,7 +154,7 @@ public class KafkaProducerTest {
         ArgumentCaptor<Callback> callBackCaptor = ArgumentCaptor.forClass(Callback.class);
         Mockito.verify(producer.getKafkaProducer()).send(any(ProducerRecord.class), callBackCaptor.capture());
         Callback kafkaCallback = callBackCaptor.getValue();
-        kafkaCallback.onCompletion(new RecordMetadata(null, 0, 0, 0, 0L, 0, 0), null);
+        kafkaCallback.onCompletion(new RecordMetadata(null, 0, 0, 0, 0, 0), null);
         assertRecordMetadataExists();
     }
 
@@ -177,7 +177,7 @@ public class KafkaProducerTest {
         Mockito.verify(exchange).setException(isA(ApiException.class));
         Mockito.verify(callback).done(eq(true));
         Callback kafkaCallback = callBackCaptor.getValue();
-        kafkaCallback.onCompletion(new RecordMetadata(null, 0, 0, 0, 0L, 0, 0), null);
+        kafkaCallback.onCompletion(new RecordMetadata(null, 0, 0, 0, 0, 0), null);
         assertRecordMetadataExists();
     }