You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gz...@apache.org on 2024/01/24 10:15:55 UTC
(camel) branch release/3.21.4 updated: [Minor] Apply formatter
This is an automated email from the ASF dual-hosted git repository.
gzurowski pushed a commit to branch release/3.21.4
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/release/3.21.4 by this push:
new ef1ca2cc280 [Minor] Apply formatter
ef1ca2cc280 is described below
commit ef1ca2cc2800a42a2a06d7f3bc2435c8e048cc82
Author: Gregor Zurowski <gr...@zurowski.org>
AuthorDate: Wed Jan 24 11:15:38 2024 +0100
[Minor] Apply formatter
---
.../java/org/apache/camel/component/kafka/KafkaFetchRecords.java | 6 +++---
.../component/kafka/consumer/support/KafkaRecordProcessor.java | 4 ++--
.../kafka/consumer/support/KafkaRecordProcessorFacade.java | 4 ++--
.../camel/component/kafka/consumer/support/ProcessingResult.java | 1 -
4 files changed, 7 insertions(+), 8 deletions(-)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index fe9b3bdf9ee..7c0c004e682 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -297,7 +297,7 @@ public class KafkaFetchRecords implements Runnable {
}
protected void startPolling() {
-
+
try {
/*
* We lock the processing of the record to avoid raising a WakeUpException as a result to a call
@@ -307,7 +307,7 @@ public class KafkaFetchRecords implements Runnable {
long pollTimeoutMs = kafkaConsumer.getEndpoint().getConfiguration().getPollTimeoutMs();
Duration pollDuration = Duration.ofMillis(pollTimeoutMs);
-
+
if (LOG.isTraceEnabled()) {
LOG.trace("Polling {} from {} with timeout: {}", threadId, getPrintableTopic(), pollTimeoutMs);
}
@@ -325,7 +325,7 @@ public class KafkaFetchRecords implements Runnable {
ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords);
updateTaskState();
-
+
// when breakOnFirstError we want to unsubscribe from Kafka
if (result != null && result.isBreakOnErrorHit() && !this.state.equals(State.PAUSED)) {
LOG.debug("We hit an error ... setting flags to force reconnect");
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
index 1299bdc0ba8..2cb1a859745 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessor.java
@@ -112,7 +112,7 @@ public class KafkaRecordProcessor {
} catch (Exception e) {
exchange.setException(e);
}
-
+
ProcessingResult result = ProcessingResult.newUnprocessed();
if (exchange.getException() != null) {
LOG.debug("An exception was thrown for record at partition {} and offset {}",
@@ -123,7 +123,7 @@ public class KafkaRecordProcessor {
} else {
result = new ProcessingResult(false, exchange.getException() != null);
}
-
+
if (!result.isBreakOnErrorHit()) {
commitManager.recordOffset(topicPartition, record.offset());
}
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
index b114c3ff8a6..65f43b265f6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/KafkaRecordProcessorFacade.java
@@ -56,7 +56,7 @@ public class KafkaRecordProcessorFacade {
public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> allRecords) {
logRecords(allRecords);
-
+
ProcessingResult result = ProcessingResult.newUnprocessed();
Set<TopicPartition> partitions = allRecords.partitions();
@@ -128,7 +128,7 @@ public class KafkaRecordProcessorFacade {
ProcessingResult result = kafkaRecordProcessor.processExchange(exchange, partition, partitionHasNext,
recordHasNext, record, camelKafkaConsumer.getExceptionHandler());
-
+
// success so release the exchange
camelKafkaConsumer.releaseExchange(exchange, false);
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
index 87f88c6e23d..1728616cf9e 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/consumer/support/ProcessingResult.java
@@ -17,7 +17,6 @@
package org.apache.camel.component.kafka.consumer.support;
-
public final class ProcessingResult {
private static final ProcessingResult UNPROCESSED_RESULT
= new ProcessingResult(false, false);