You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/10 09:24:03 UTC

(camel) branch main updated (e970f72480f -> b7dc1da4dbe)

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


    from e970f72480f Revert "Bump org.apache.camel.k:camel-k-crds from 2.1.0 to 2.2.0 (#12641)" (#12732)
     new cf84a59a0ea remove lastResult from Kafka Camel component (#12002)
     new b7dc1da4dbe CAMEL-20044: formatting fixes

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../camel/component/kafka/KafkaFetchRecords.java   | 42 +++----------------
 .../consumer/support/KafkaRecordProcessor.java     | 48 +++++++++-------------
 .../support/KafkaRecordProcessorFacade.java        | 38 +++++++----------
 .../kafka/consumer/support/ProcessingResult.java   | 21 +---------
 ...eakOnFirstErrorOffUsingKafkaManualCommitIT.java |  4 +-
 .../KafkaBreakOnFirstErrorReplayOldMessagesIT.java |  4 +-
 .../KafkaBreakOnFirstErrorSeekIssueIT.java         |  4 +-
 ...stErrorWithBatchUsingAsynchCommitManagerIT.java |  4 +-
 ...irstErrorWithBatchUsingKafkaManualCommitIT.java |  4 +-
 ...rrorWithBatchUsingKafkaManualCommitRetryIT.java |  4 +-
 ...rstErrorWithBatchUsingSynchCommitManagerIT.java |  4 +-
 .../health/KafkaConsumerHealthCheckIT.java         |  4 +-
 12 files changed, 58 insertions(+), 123 deletions(-)


(camel) 01/02: remove lastResult from Kafka Camel component (#12002)

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit cf84a59a0ead8a58a9ab4670ff37dfb942c963c1
Author: Mike Barlotta <Co...@users.noreply.github.com>
AuthorDate: Tue Jan 9 09:11:48 2024 -0500

    remove lastResult from Kafka Camel component (#12002)
---
 .../camel/component/kafka/KafkaFetchRecords.java   | 46 ++++-----------------
 .../consumer/support/KafkaRecordProcessor.java     | 48 +++++++++-------------
 .../support/KafkaRecordProcessorFacade.java        | 37 ++++++-----------
 .../kafka/consumer/support/ProcessingResult.java   | 20 +--------
 4 files changed, 43 insertions(+), 108 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 2cdd516d0c4..9a5004cc2de 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -305,8 +305,7 @@ public class KafkaFetchRecords implements Runnable {
     }
 
     protected void startPolling() {
-        long partitionLastOffset = -1;
-
+        
         try {
             /*
              * We lock the processing of the record to avoid raising a WakeUpException as a result to a call
@@ -315,7 +314,8 @@ public class KafkaFetchRecords implements Runnable {
             lock.lock();
 
             long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
-
+            Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
+            
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Polling {} from {} with timeout: {}", threadId, getPrintableTopic(), pollTimeoutMs);
             }
@@ -323,10 +323,6 @@ public class KafkaFetchRecords implements Runnable {
             KafkaRecordProcessorFacade recordProcessorFacade = new KafkaRecordProcessorFacade(
                     kafkaConsumer, threadId, commitManager, consumerListener);
 
-            Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
-
-            ProcessingResult lastResult = null;
-
             while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) {
                 ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration);
                 if (consumerListener != null) {
@@ -335,43 +331,15 @@ public class KafkaFetchRecords implements Runnable {
                     }
                 }
 
-                if (lastResult != null) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("This polling iteration is using lastresult on partition {} and offset {}",
-                                lastResult.getPartition(), lastResult.getPartitionLastOffset());
-                    }
-                } else {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("This polling iteration is using lastresult of null");
-                    }
-                }
-
-                ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords, lastResult);
-
-                if (result != null) {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("This polling iteration had a result returned for partition {} and offset {}",
-                                result.getPartition(), result.getPartitionLastOffset());
-                    }
-                } else {
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("This polling iteration had a result returned as null");
-                    }
-                }
-
+                ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords);
                 updateTaskState();
+                
+                // when breakOnFirstError we want to unsubscribe from Kafka 
                 if (result != null && result.isBreakOnErrorHit() && !this.state.equals(State.PAUSED)) {
                     LOG.debug("We hit an error ... setting flags to force reconnect");
                     // force re-connect
                     setReconnect(true);
                     setConnected(false);
-                } else {
-                    lastResult = result;
-
-                    if (LOG.isTraceEnabled()) {
-                        LOG.trace("Setting lastresult to partition {} and offset {}",
-                                lastResult.getPartition(), lastResult.getPartitionLastOffset());
-                    }
                 }
 
             }
@@ -405,6 +373,8 @@ public class KafkaFetchRecords implements Runnable {
                         e.getClass().getName(), threadId, getPrintableTopic(), e.getMessage());
             }
 
+            // why do we set this to -1
+            long partitionLastOffset = -1;
             pollExceptionStrategy.handle(partitionLastOffset, e);
         } finally {
             // only close if not retry
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index 2954836f5f6..b06e53493e6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -23,7 +23,6 @@ import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.kafka.KafkaConfiguration;
 import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.consumer.AbstractCommitManager;
 import org.apache.camel.component.kafka.consumer.CommitManager;
 import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
 import org.apache.camel.component.kafka.serde.KafkaHeaderDeserializer;
@@ -86,8 +85,7 @@ public class KafkaRecordProcessor {
 
     public ProcessingResult processExchange(
             Exchange exchange, TopicPartition topicPartition, boolean partitionHasNext,
-            boolean recordHasNext, ConsumerRecord<Object, Object> record, ProcessingResult lastResult,
-            ExceptionHandler exceptionHandler) {
+            boolean recordHasNext, ConsumerRecord<Object, Object> record, ExceptionHandler exceptionHandler) {
 
         Message message = exchange.getMessage();
 
@@ -114,33 +112,31 @@ public class KafkaRecordProcessor {
         } catch (Exception e) {
             exchange.setException(e);
         }
+
+        ProcessingResult result = ProcessingResult.newUnprocessed();
         if (exchange.getException() != null) {
             LOG.debug("An exception was thrown for record at partition {} and offset {}",
                     record.partition(), record.offset());
 
-            boolean breakOnErrorExit = processException(exchange, topicPartition, record, lastResult,
-                    exceptionHandler);
-            return new ProcessingResult(breakOnErrorExit, lastResult.getPartition(), lastResult.getPartitionLastOffset(), true);
+            boolean breakOnErrorExit = processException(exchange, topicPartition, record, exceptionHandler);
+            result = new ProcessingResult(breakOnErrorExit, true);
         } else {
-            return new ProcessingResult(false, record.partition(), record.offset(), exchange.getException() != null);
+            result = new ProcessingResult(false, exchange.getException() != null);
+        }
+
+        if (!result.isBreakOnErrorHit()) {
+            commitManager.recordOffset(topicPartition, record.offset());
         }
+
+        return result;
     }
 
     private boolean processException(
             Exchange exchange, TopicPartition topicPartition,
-            ConsumerRecord<Object, Object> consumerRecord, ProcessingResult lastResult,
-            ExceptionHandler exceptionHandler) {
+            ConsumerRecord<Object, Object> consumerRecord, ExceptionHandler exceptionHandler) {
 
         // processing failed due to an unhandled exception, what should we do
         if (configuration.isBreakOnFirstError()) {
-            if (lastResult.getPartition() != -1 &&
-                    lastResult.getPartition() != consumerRecord.partition()) {
-                LOG.error("About to process an exception with UNEXPECTED partition & offset. Got topic partition {}. " +
-                          " The last result was on partition {} with offset {} but was expecting partition {} with offset {}",
-                        topicPartition.partition(), lastResult.getPartition(), lastResult.getPartitionLastOffset(),
-                        consumerRecord.partition(), consumerRecord.offset());
-            }
-
             // we are failing and we should break out
             if (LOG.isWarnEnabled()) {
                 Exception exc = exchange.getException();
@@ -150,17 +146,13 @@ public class KafkaRecordProcessor {
                         consumerRecord.offset(), consumerRecord.partition());
             }
 
-            // force commit, so we resume on next poll where we failed
-            // except when the failure happened at the first message in a poll
-            if (lastResult.getPartitionLastOffset() != AbstractCommitManager.START_OFFSET) {
-                // we should just do a commit (vs the original forceCommit)
-                // when route uses NOOP Commit Manager it will rely
-                // on the route implementation to explicitly commit offset
-                // when route uses Synch/Asynch Commit Manager it will
-                // ALWAYS commit the offset for the failing record
-                // and will ALWAYS retry it
-                commitManager.commit(topicPartition);
-            }
+            // we should just do a commit (vs the original forceCommit)
+            // when route uses NOOP Commit Manager it will rely
+            // on the route implementation to explicitly commit offset
+            // when route uses Synch/Asynch Commit Manager it will
+            // ALWAYS commit the offset for the failing record
+            // and will ALWAYS retry it
+            commitManager.commit(topicPartition);
 
             // continue to next partition
             return true;
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index f6317510fd9..470e2ea01e6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -54,18 +54,16 @@ public class KafkaRecordProcessorFacade {
         return camelKafkaConsumer.isStopping();
     }
 
-    public ProcessingResult processPolledRecords(
-            ConsumerRecords<Object, Object> allRecords, ProcessingResult resultFromPreviousPoll) {
+    public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) {
         logRecords(allRecords);
 
+        ProcessingResult result = ProcessingResult.newUnprocessed();
+
         Set<TopicPartition> partitions = allRecords.partitions();
         Iterator<TopicPartition> partitionIterator = partitions.iterator();
 
         LOG.debug("Poll received records on {} partitions", partitions.size());
 
-        ProcessingResult lastResult
-                = resultFromPreviousPoll == null ? ProcessingResult.newUnprocessed() : resultFromPreviousPoll;
-
         while (partitionIterator.hasNext() && !isStopping()) {
             TopicPartition partition = partitionIterator.next();
 
@@ -76,36 +74,33 @@ public class KafkaRecordProcessorFacade {
 
             logRecordsInPartition(partitionRecords, partition);
 
-            while (!lastResult.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) {
+            while (!result.isBreakOnErrorHit() && recordIterator.hasNext() && !isStopping()) {
                 ConsumerRecord<Object, Object> consumerRecord = recordIterator.next();
 
                 LOG.debug("Processing record on partition {} with offset {}", consumerRecord.partition(),
                         consumerRecord.offset());
 
-                lastResult = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(), lastResult,
+                result = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(),
                         kafkaRecordProcessor, consumerRecord);
 
-                LOG.debug(
-                        "Processed record on partition {} with offset {} and got ProcessingResult for partition {} and offset {}",
-                        consumerRecord.partition(), consumerRecord.offset(), lastResult.getPartition(),
-                        lastResult.getPartitionLastOffset());
+                LOG.debug("Processed record on partition {} with offset {}", consumerRecord.partition(), consumerRecord.offset());
 
                 if (consumerListener != null) {
-                    if (!consumerListener.afterProcess(lastResult)) {
+                    if (!consumerListener.afterProcess(result)) {
                         commitManager.commit(partition);
-                        return lastResult;
+                        return result;
                     }
                 }
             }
 
-            if (!lastResult.isBreakOnErrorHit()) {
+            if (!result.isBreakOnErrorHit()) {
                 LOG.debug("Committing offset on successful execution");
                 // all records processed from partition so commit them
                 commitManager.commit(partition);
             }
         }
 
-        return lastResult;
+        return result;
     }
 
     private void logRecordsInPartition(List<ConsumerRecord<Object, Object>> partitionRecords, TopicPartition partition) {
@@ -125,7 +120,6 @@ public class KafkaRecordProcessorFacade {
             TopicPartition partition,
             boolean partitionHasNext,
             boolean recordHasNext,
-            final ProcessingResult lastResult,
             KafkaRecordProcessor kafkaRecordProcessor,
             ConsumerRecord<Object, Object> consumerRecord) {
 
@@ -133,18 +127,13 @@ public class KafkaRecordProcessorFacade {
 
         Exchange exchange = camelKafkaConsumer.createExchange(false);
 
-        ProcessingResult currentResult
-                = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext,
-                        recordHasNext, consumerRecord, lastResult, camelKafkaConsumer.getExceptionHandler());
-
-        if (!currentResult.isBreakOnErrorHit()) {
-            commitManager.recordOffset(partition, currentResult.getPartitionLastOffset());
-        }
+        ProcessingResult result = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext,
+                recordHasNext, consumerRecord, camelKafkaConsumer.getExceptionHandler());
 
         // success so release the exchange
         camelKafkaConsumer.releaseExchange(exchange, false);
 
-        return currentResult;
+        return result;
     }
 
     private void logRecord(ConsumerRecord<Object, Object> consumerRecord) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
index fe3afd6ee8d..87f88c6e23d 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
@@ -17,24 +17,16 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
-import org.apache.camel.component.kafka.consumer.AbstractCommitManager;
 
 public final class ProcessingResult {
     private static final ProcessingResult UNPROCESSED_RESULT
-            = new ProcessingResult(
-                    false,
-                    AbstractCommitManager.NON_PARTITION,
-                    AbstractCommitManager.START_OFFSET, false);
+            = new ProcessingResult(false, false);
 
     private final boolean breakOnErrorHit;
-    private final long lastPartition;
-    private final long partitionLastOffset;
     private final boolean failed;
 
-    ProcessingResult(boolean breakOnErrorHit, long lastPartition, long partitionLastOffset, boolean failed) {
+    ProcessingResult(boolean breakOnErrorHit, boolean failed) {
         this.breakOnErrorHit = breakOnErrorHit;
-        this.lastPartition = lastPartition;
-        this.partitionLastOffset = partitionLastOffset;
         this.failed = failed;
     }
 
@@ -42,14 +34,6 @@ public final class ProcessingResult {
         return breakOnErrorHit;
     }
 
-    public long getPartitionLastOffset() {
-        return partitionLastOffset;
-    }
-
-    public long getPartition() {
-        return lastPartition;
-    }
-
     public boolean isFailed() {
         return failed;
     }


(camel) 02/02: CAMEL-20044: formatting fixes

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit b7dc1da4dbef98d9af96fe4abc0b75cc0578feeb
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Jan 9 17:39:40 2024 +0100

    CAMEL-20044: formatting fixes
---
 .../java/org/apache/camel/component/kafka/KafkaFetchRecords.java  | 8 ++++----
 .../kafka/consumer/support/KafkaRecordProcessorFacade.java        | 3 ++-
 .../camel/component/kafka/consumer/support/ProcessingResult.java  | 1 -
 .../KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT.java        | 4 ++--
 .../integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java    | 4 ++--
 .../kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java      | 4 ++--
 ...KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.java | 4 ++--
 .../KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.java  | 4 ++--
 ...kaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.java | 4 ++--
 .../KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT.java | 4 ++--
 .../kafka/integration/health/KafkaConsumerHealthCheckIT.java      | 4 ++--
 11 files changed, 22 insertions(+), 22 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 9a5004cc2de..2c32e727a50 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -305,7 +305,7 @@ public class KafkaFetchRecords implements Runnable {
     }
 
     protected void startPolling() {
-        
+
         try {
             /*
              * We lock the processing of the record to avoid raising a WakeUpException as a result to a call
@@ -315,7 +315,7 @@ public class KafkaFetchRecords implements Runnable {
 
             long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
             Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
-            
+
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Polling {} from {} with timeout: {}", threadId, getPrintableTopic(), pollTimeoutMs);
             }
@@ -333,8 +333,8 @@ public class KafkaFetchRecords implements Runnable {
 
                 ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords);
                 updateTaskState();
-                
-                // when breakOnFirstError we want to unsubscribe from Kafka 
+
+                // when breakOnFirstError we want to unsubscribe from Kafka
                 if (result != null && result.isBreakOnErrorHit() && !this.state.equals(State.PAUSED)) {
                     LOG.debug("We hit an error ... setting flags to force reconnect");
                     // force re-connect
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index 470e2ea01e6..8965a1037c4 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -83,7 +83,8 @@ public class KafkaRecordProcessorFacade {
                 result = processRecord(partition, partitionIterator.hasNext(), recordIterator.hasNext(),
                         kafkaRecordProcessor, consumerRecord);
 
-                LOG.debug("Processed record on partition {} with offset {}", consumerRecord.partition(), consumerRecord.offset());
+                LOG.debug("Processed record on partition {} with offset {}", consumerRecord.partition(),
+                        consumerRecord.offset());
 
                 if (consumerListener != null) {
                     if (!consumerListener.afterProcess(result)) {
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
index 87f88c6e23d..1728616cf9e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
@@ -17,7 +17,6 @@
 
 package org.apache.camel.component.kafka.consumer.support;
 
-
 public final class ProcessingResult {
     private static final ProcessingResult UNPROCESSED_RESULT
             = new ProcessingResult(false, false);
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT.java
index bc3ad03f37e..4d1d355e935 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT.java
@@ -48,8 +48,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
  */
 @Tags({ @Tag("breakOnFirstError") })
 @EnabledOnOs(value = { OS.LINUX, OS.MAC, OS.FREEBSD, OS.OPENBSD, OS.WINDOWS },
-        architectures = { "amd64", "aarch_64", "s390x" },
-        disabledReason = "This test does not run reliably on ppc64le")
+             architectures = { "amd64", "aarch_64", "s390x" },
+             disabledReason = "This test does not run reliably on ppc64le")
 class KafkaBreakOnFirstErrorOffUsingKafkaManualCommitIT extends BaseEmbeddedKafkaTestSupport {
     public static final String ROUTE_ID = "breakOnFirstErrorOff";
     public static final String TOPIC = "breakOnFirstErrorOff";
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java
index 7f401847667..9025cea92a0 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReplayOldMessagesIT.java
@@ -55,8 +55,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 @Tags({ @Tag("breakOnFirstError") })
 @EnabledOnOs(value = { OS.LINUX, OS.MAC, OS.FREEBSD, OS.OPENBSD, OS.WINDOWS },
-        architectures = { "amd64", "aarch_64", "s390x" },
-        disabledReason = "This test does not run reliably on ppc64le")
+             architectures = { "amd64", "aarch_64", "s390x" },
+             disabledReason = "This test does not run reliably on ppc64le")
 class KafkaBreakOnFirstErrorReplayOldMessagesIT extends BaseEmbeddedKafkaTestSupport {
 
     public static final String ROUTE_ID = "breakOnFirstError-20044";
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java
index 6a0069201aa..f59cbefb485 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorSeekIssueIT.java
@@ -52,8 +52,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 @Tags({ @Tag("breakOnFirstError") })
 @EnabledOnOs(value = { OS.LINUX, OS.MAC, OS.FREEBSD, OS.OPENBSD, OS.WINDOWS },
-        architectures = { "amd64", "aarch_64", "s390x" },
-        disabledReason = "This test does not run reliably on ppc64le")
+             architectures = { "amd64", "aarch_64", "s390x" },
+             disabledReason = "This test does not run reliably on ppc64le")
 class KafkaBreakOnFirstErrorSeekIssueIT extends BaseEmbeddedKafkaTestSupport {
 
     public static final String ROUTE_ID = "breakOnFirstError-19894";
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.java
index c02b3524a0f..f0842dac801 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT.java
@@ -48,8 +48,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  */
 @Tags({ @Tag("breakOnFirstError") })
 @EnabledOnOs(value = { OS.LINUX, OS.MAC, OS.FREEBSD, OS.OPENBSD, OS.WINDOWS },
-        architectures = { "amd64", "aarch_64", "s390x" },
-        disabledReason = "This test does not run reliably on ppc64le")
+             architectures = { "amd64", "aarch_64", "s390x" },
+             disabledReason = "This test does not run reliably on ppc64le")
 class KafkaBreakOnFirstErrorWithBatchUsingAsynchCommitManagerIT extends BaseEmbeddedKafkaTestSupport {
     public static final String ROUTE_ID = "breakOnFirstErrorBatchIT";
     public static final String TOPIC = "breakOnFirstErrorBatchIT";
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.java
index b0e719751a2..9c8bbe3f503 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT.java
@@ -48,8 +48,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
  */
 @Tags({ @Tag("breakOnFirstError") })
 @EnabledOnOs(value = { OS.LINUX, OS.MAC, OS.FREEBSD, OS.OPENBSD, OS.WINDOWS },
-        architectures = { "amd64", "aarch_64", "s390x" },
-        disabledReason = "This test does not run reliably on ppc64le")
+             architectures = { "amd64", "aarch_64", "s390x" },
+             disabledReason = "This test does not run reliably on ppc64le")
 class KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitIT extends BaseEmbeddedKafkaTestSupport {
     public static final String ROUTE_ID = "breakOnFirstErrorBatchOnExceptionIT";
     public static final String TOPIC = "breakOnFirstErrorBatchOnExceptionIT";
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.java
index 504a34feb13..e0640c82757 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT.java
@@ -52,8 +52,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 @Tags({ @Tag("breakOnFirstError") })
 @EnabledOnOs(value = { OS.LINUX, OS.MAC, OS.FREEBSD, OS.OPENBSD, OS.WINDOWS },
-        architectures = { "amd64", "aarch_64", "s390x" },
-        disabledReason = "This test does not run reliably on ppc64le")
+             architectures = { "amd64", "aarch_64", "s390x" },
+             disabledReason = "This test does not run reliably on ppc64le")
 class KafkaBreakOnFirstErrorWithBatchUsingKafkaManualCommitRetryIT extends BaseEmbeddedKafkaTestSupport {
 
     public static final String ROUTE_ID = "breakOnFirstErrorBatchRetryIT";
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT.java
index f025d3b9917..e944fdfaa19 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT.java
@@ -48,8 +48,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  */
 @Tags({ @Tag("breakOnFirstError") })
 @EnabledOnOs(value = { OS.LINUX, OS.MAC, OS.FREEBSD, OS.OPENBSD, OS.WINDOWS },
-        architectures = { "amd64", "aarch_64", "s390x" },
-        disabledReason = "This test does not run reliably on ppc64le")
+             architectures = { "amd64", "aarch_64", "s390x" },
+             disabledReason = "This test does not run reliably on ppc64le")
 class KafkaBreakOnFirstErrorWithBatchUsingSynchCommitManagerIT extends BaseEmbeddedKafkaTestSupport {
     public static final String ROUTE_ID = "breakOnFirstErrorBatchIT";
     public static final String TOPIC = "breakOnFirstErrorBatchIT";
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java
index 73b95b0eb07..b1c2a787da7 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/health/KafkaConsumerHealthCheckIT.java
@@ -59,8 +59,8 @@ import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
 @Tags({ @Tag("health") })
 @EnabledOnOs(value = { OS.LINUX, OS.MAC, OS.FREEBSD, OS.OPENBSD, OS.WINDOWS },
-        architectures = { "amd64", "aarch_64", "s390x" },
-        disabledReason = "This test does not run reliably on ppc64le")
+             architectures = { "amd64", "aarch_64", "s390x" },
+             disabledReason = "This test does not run reliably on ppc64le")
 public class KafkaConsumerHealthCheckIT extends KafkaHealthCheckTestSupport {
     public static final String TOPIC = "test-health";
     public static final String SKIPPED_HEADER_KEY = "CamelSkippedHeader";