You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/08 16:10:24 UTC

(camel) 02/06: CAMEL-20297 camel-kafka: do not swallow interrupted exceptions

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 56327a735c004f14136d90f1c565215c5aedcd0a
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jan 5 11:19:38 2024 +0100

    CAMEL-20297 camel-kafka: do not swallow interrupted exceptions
---
 .../processor/idempotent/kafka/KafkaIdempotentRepository.java | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index 2169815777d..84257097d00 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -363,7 +363,8 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
                         topic);
             }
         } catch (InterruptedException e) {
-            log.warn("Interrupted while warming up cache. This exception is ignored.", e);
+            log.warn("Interrupted while warming up cache.", e);
+            Thread.currentThread().interrupt();
         }
     }
 
@@ -378,7 +379,8 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
                 log.warn("Timeout waiting for cache to shutdown from topic {}. Proceeding anyway.", topic);
             }
         } catch (InterruptedException e) {
-            log.warn("Interrupted waiting on shutting down cache due {}. This exception is ignored.", e.getMessage());
+            log.warn("Interrupted waiting on shutting down cache due {}.", e.getMessage());
+            Thread.currentThread().interrupt();
         }
         camelContext.getExecutorServiceManager().shutdown(executorService);
 
@@ -406,7 +408,10 @@ public class KafkaIdempotentRepository extends ServiceSupport implements Idempot
             ObjectHelper.notNull(producer, "producer");
 
             producer.send(new ProducerRecord<>(topic, key, action.toString())).get(); // sync send
-        } catch (ExecutionException | InterruptedException e) {
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeCamelException(e);
+        } catch (ExecutionException e) {
             throw new RuntimeCamelException(e);
         }
     }