You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2024/01/12 16:24:50 UTC
(camel) 01/03: CAMEL-20297 camel-core-processor: do not swallow interrupted exceptions
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 33ed6c053ed1a1466a0f9e4698da6b54d881341c
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Fri Jan 12 10:57:38 2024 +0100
CAMEL-20297 camel-core-processor: do not swallow interrupted exceptions
---
.../java/org/apache/camel/processor/DelayProcessorSupport.java | 7 +++++++
.../main/java/org/apache/camel/processor/MulticastProcessor.java | 2 +-
.../src/main/java/org/apache/camel/processor/Resequencer.java | 1 +
.../main/java/org/apache/camel/processor/StreamResequencer.java | 2 ++
.../org/apache/camel/processor/aggregate/AggregateProcessor.java | 1 +
.../camel/processor/errorhandler/RedeliveryErrorHandler.java | 2 ++
6 files changed, 14 insertions(+), 1 deletion(-)
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
index 8f8c17b7037..337fc6043d7 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
@@ -102,6 +102,12 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
delay(delay, exchange);
// then continue routing
return processor.process(exchange, callback);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ // exception occurred so we are done
+ exchange.setException(e);
+ callback.done(true);
+ return true;
} catch (Exception e) {
// exception occurred so we are done
exchange.setException(e);
@@ -138,6 +144,7 @@ public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
delay(delay, exchange);
} catch (InterruptedException ie) {
exchange.setException(ie);
+ Thread.currentThread().interrupt();
}
// then continue routing
return processor.process(exchange, callback);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 6efdaebff5d..0a2333ba89a 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -731,7 +731,7 @@ public class MulticastProcessor extends AsyncProcessorSupport
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
- // ignore
+ Thread.currentThread().interrupt();
}
runnable.run();
});
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
index 0375a20e41b..68585f92c50 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Resequencer.java
@@ -491,6 +491,7 @@ public class Resequencer extends AsyncProcessorSupport implements Navigate<Proce
}
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
break;
}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
index 76730878569..2771df6c9fb 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/StreamResequencer.java
@@ -243,6 +243,7 @@ public class StreamResequencer extends AsyncProcessorSupport
try {
Thread.sleep(getTimeout());
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
// we were interrupted so break out
exchange.setException(e);
callback.done(true);
@@ -303,6 +304,7 @@ public class StreamResequencer extends AsyncProcessorSupport
deliveryRequestLock.unlock();
}
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
break;
}
try {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index d97227d0d65..e278a5db1e4 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -1703,6 +1703,7 @@ public class AggregateProcessor extends AsyncProcessorSupport
try {
Thread.sleep(100);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
// break out as we got interrupted such as the JVM terminating
LOG.warn("Interrupted while waiting for {} inflight exchanges to complete.", getInProgressCompleteExchanges());
break;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 60063459add..21c66770330 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -827,6 +827,8 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
// as we do not want to continue routing (for example a task has been cancelled)
exchange.setRouteStop(true);
reactiveExecutor.schedule(callback);
+
+ Thread.currentThread().interrupt();
}
}
} else {