You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/12 16:24:50 UTC

(camel) 01/03: CAMEL-20297 camel-core-processor: do not swallow interrupted exceptions

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 33ed6c053ed1a1466a0f9e4698da6b54d881341c
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jan 12 10:57:38 2024 +0100

    CAMEL-20297 camel-core-processor: do not swallow interrupted exceptions
---
 .../java/org/apache/camel/processor/DelayProcessorSupport.java     | 7 +++++++
 .../main/java/org/apache/camel/processor/MulticastProcessor.java   | 2 +-
 .../src/main/java/org/apache/camel/processor/Resequencer.java      | 1 +
 .../main/java/org/apache/camel/processor/StreamResequencer.java    | 2 ++
 .../org/apache/camel/processor/aggregate/AggregateProcessor.java   | 1 +
 .../camel/processor/errorhandler/RedeliveryErrorHandler.java       | 2 ++
 6 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
index 8f8c17b7037..337fc6043d7 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
@@ -102,6 +102,12 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
                 delay(delay, exchange);
                 // then continue routing
                 return processor.process(exchange, callback);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                // exception occurred so we are done
+                exchange.setException(e);
+                callback.done(true);
+                return true;
             } catch (Exception e) {
                 // exception occurred so we are done
                 exchange.setException(e);
@@ -138,6 +144,7 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
                             delay(delay, exchange);
                         } catch (InterruptedException ie) {
                             exchange.setException(ie);
+                            Thread.currentThread().interrupt();
                         }
                         // then continue routing
                         return processor.process(exchange, callback);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 6efdaebff5d..0a2333ba89a 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -731,7 +731,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
                 try {
                     Thread.sleep(delay);
                 } catch (InterruptedException e) {
-                    // ignore
+                    Thread.currentThread().interrupt();
                 }
                 runnable.run();
             });
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
index 0375a20e41b..68585f92c50 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
@@ -491,6 +491,7 @@ public class Resequencer extends AsyncProcessorSupport implements Navigate<Proce
                         }
 
                     } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
                         break;
                     }
 
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
index 76730878569..2771df6c9fb 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
@@ -243,6 +243,7 @@ public class StreamResequencer extends AsyncProcessorSupport
             try {
                 Thread.sleep(getTimeout());
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 // we were interrupted so break out
                 exchange.setException(e);
                 callback.done(true);
@@ -303,6 +304,7 @@ public class StreamResequencer extends AsyncProcessorSupport
                         deliveryRequestLock.unlock();
                     }
                 } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
                     break;
                 }
                 try {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index d97227d0d65..e278a5db1e4 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -1703,6 +1703,7 @@ public class AggregateProcessor extends AsyncProcessorSupport
             try {
                 Thread.sleep(100);
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 // break out as we got interrupted such as the JVM terminating
                 LOG.warn("Interrupted while waiting for {} inflight exchanges to complete.", getInProgressCompleteExchanges());
                 break;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 60063459add..21c66770330 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -827,6 +827,8 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
                             // as we do not want to continue routing (for example a task has been cancelled)
                             exchange.setRouteStop(true);
                             reactiveExecutor.schedule(callback);
+
+                            Thread.currentThread().interrupt();
                         }
                     }
                 } else {