You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/10/19 05:18:51 UTC
[camel] branch main updated: (chores) camel-core: reduce method size to force inlining
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 50ec2d6a150 (chores) camel-core: reduce method size to force inlining
50ec2d6a150 is described below
commit 50ec2d6a150d26868719edde03a5cc35c30e1556
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Tue Oct 17 18:46:23 2023 +0200
(chores) camel-core: reduce method size to force inlining
Reduce the size of a few hot methods, so that their byte code size is smaller than 325 bytes, to increase the chance that the JVM will inline them when possible.
---
.../camel/impl/engine/CamelInternalProcessor.java | 151 ++++++++++++---------
.../camel/impl/engine/DefaultReactiveExecutor.java | 88 +++++++-----
.../impl/engine/DefaultStreamCachingStrategy.java | 58 ++++----
.../impl/engine/SharedCamelInternalProcessor.java | 116 +++++++++-------
.../camel/impl/engine/StreamCachingHelper.java | 27 ++--
.../errorhandler/RedeliveryErrorHandler.java | 60 ++++----
.../camel/main/MainDurationEventNotifier.java | 66 +++++----
.../org/apache/camel/support/ExchangeHelper.java | 83 ++++++-----
.../org/apache/camel/support/MessageSupport.java | 11 +-
.../org/apache/camel/support/UnitOfWorkHelper.java | 27 ++--
10 files changed, 396 insertions(+), 291 deletions(-)
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index af24a796b0f..e343e51536e 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -285,7 +285,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
// in between the:
// CAMEL END USER - DEBUG ME HERE +++ START +++
// CAMEL END USER - DEBUG ME HERE +++ END +++
- // you can see in the code below.
+ // you can see in the code below within the processTransacted or processNonTransacted methods.
// ----------------------------------------------------------
if (processor == null || exchange.isRouteStop()) {
@@ -295,15 +295,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
}
if (shutdownStrategy.isForceShutdown()) {
- String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: "
- + exchange;
- LOG.debug(msg);
- if (exchange.getException() == null) {
- exchange.setException(new RejectedExecutionException(msg));
- }
- // force shutdown so we should not continue
- originalCallback.done(true);
- return true;
+ return processShutdown(exchange, originalCallback);
}
Object[] states;
@@ -328,76 +320,104 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
states[j++] = state;
}
} catch (Exception e) {
- // error in before so break out
- exchange.setException(e);
- try {
- originalCallback.done(true);
- } finally {
- // task is done so reset
- if (taskFactory != null) {
- taskFactory.release(afterTask);
- }
- }
- return true;
+ return handleException(exchange, originalCallback, e, afterTask);
}
}
if (exchange.isTransacted()) {
- // must be synchronized for transacted exchanges
- if (LOG.isTraceEnabled()) {
- LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(),
- exchange);
- }
- try {
- // ----------------------------------------------------------
- // CAMEL END USER - DEBUG ME HERE +++ START +++
- // ----------------------------------------------------------
- processor.process(exchange);
- // ----------------------------------------------------------
- // CAMEL END USER - DEBUG ME HERE +++ END +++
- // ----------------------------------------------------------
- } catch (Exception e) {
- exchange.setException(e);
- } finally {
- // processing is done
- afterTask.done(true);
- }
- // we are done synchronously - must return true
- return true;
+ return processTransacted(exchange, afterTask);
} else {
- final UnitOfWork uow = exchange.getUnitOfWork();
+ return processNonTransacted(exchange, afterTask);
+ }
+ }
- // optimize to only do before uow processing if really needed
- AsyncCallback async = afterTask;
- boolean beforeAndAfter = uow != null && uow.isBeforeAfterProcess();
- if (beforeAndAfter) {
- async = uow.beforeProcess(processor, exchange, async);
- }
+ private static boolean processShutdown(Exchange exchange, AsyncCallback originalCallback) {
+ String msg = "Run not allowed as ShutdownStrategy is forcing shutting down, will reject executing exchange: "
+ + exchange;
+ LOG.debug(msg);
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException(msg));
+ }
+ // force shutdown so we should not continue
+ originalCallback.done(true);
+ return true;
+ }
+
+ private boolean processNonTransacted(Exchange exchange, CamelInternalTask afterTask) {
+ final AsyncCallback async = beforeProcess(exchange, afterTask);
+
+ // ----------------------------------------------------------
+ // CAMEL END USER - DEBUG ME HERE +++ START +++
+ // ----------------------------------------------------------
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
+ }
+ boolean sync = processor.process(exchange, async);
+ if (!sync) {
+ EventHelper.notifyExchangeAsyncProcessingStartedEvent(camelContext, exchange);
+ }
+
+ // ----------------------------------------------------------
+ // CAMEL END USER - DEBUG ME HERE +++ END +++
+ // ----------------------------------------------------------
+ // CAMEL-18255: move uow.afterProcess handling to the callback
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
+ sync ? "synchronously" : "asynchronously",
+ exchange.getExchangeId(), exchange);
+ }
+ return sync;
+ }
+
+ private AsyncCallback beforeProcess(Exchange exchange, CamelInternalTask afterTask) {
+ final UnitOfWork uow = exchange.getUnitOfWork();
+
+ // optimize to only do before uow processing if really needed
+ if (uow != null && uow.isBeforeAfterProcess()) {
+ return uow.beforeProcess(processor, exchange, afterTask);
+ }
+ return afterTask;
+ }
+
+ private boolean processTransacted(Exchange exchange, CamelInternalTask afterTask) {
+ // must be synchronized for transacted exchanges
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}", exchange.getExchangeId(),
+ exchange);
+ }
+ try {
// ----------------------------------------------------------
// CAMEL END USER - DEBUG ME HERE +++ START +++
// ----------------------------------------------------------
- if (LOG.isTraceEnabled()) {
- LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
- }
- boolean sync = processor.process(exchange, async);
- if (!sync) {
- EventHelper.notifyExchangeAsyncProcessingStartedEvent(camelContext, exchange);
- }
-
+ processor.process(exchange);
// ----------------------------------------------------------
// CAMEL END USER - DEBUG ME HERE +++ END +++
// ----------------------------------------------------------
+ } catch (Exception e) {
+ exchange.setException(e);
+ } finally {
+ // processing is done
+ afterTask.done(true);
+ }
+ // we are done synchronously - must return true
+ return true;
+ }
- // CAMEL-18255: move uow.afterProcess handling to the callback
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
- sync ? "synchronously" : "asynchronously",
- exchange.getExchangeId(), exchange);
+ private boolean handleException(
+ Exchange exchange, AsyncCallback originalCallback, Exception e, CamelInternalTask afterTask) {
+ // error in before so break out
+ exchange.setException(e);
+ try {
+ originalCallback.done(true);
+ } finally {
+ // task is done so reset
+ if (taskFactory != null) {
+ taskFactory.release(afterTask);
}
- return sync;
}
+ return true;
}
@Override
@@ -820,9 +840,10 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor implements In
}
// only return UnitOfWork if we created a new as then its us that handle the lifecycle to done the created UoW
- UnitOfWork created = null;
+
UnitOfWork uow = exchange.getUnitOfWork();
+ UnitOfWork created = null;
if (uow == null) {
// If there is no existing UoW, then we should start one and
// terminate it once processing is completed for the exchange.
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index b0f2806917b..596565f4651 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -136,37 +136,37 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
LOG.trace("Schedule [first={}, main={}, sync={}]: {}", first, main, sync, runnable);
}
if (main) {
- if (!queue.isEmpty()) {
- if (back == null) {
- back = new ArrayDeque<>();
- }
- back.push(queue);
- queue = new ArrayDeque<>();
- }
+ executeMainFlow();
}
if (first) {
queue.addFirst(runnable);
- if (stats) {
- executor.pendingTasks.increment();
- }
} else {
queue.addLast(runnable);
- if (stats) {
- executor.pendingTasks.increment();
+ }
+
+ incrementPendingTasks();
+ tryExecuteReactiveWork(runnable, sync);
+ }
+
+ private void executeMainFlow() {
+ if (!queue.isEmpty()) {
+ if (back == null) {
+ back = new ArrayDeque<>();
}
+ back.push(queue);
+ queue = new ArrayDeque<>();
}
+ }
+
+ private void tryExecuteReactiveWork(Runnable runnable, boolean sync) {
if (!running || sync) {
running = true;
- if (stats) {
- executor.runningWorkers.increment();
- }
+ incrementRunningWorkers();
try {
executeReactiveWork();
} finally {
running = false;
- if (stats) {
- executor.runningWorkers.decrement();
- }
+ decrementRunningWorkers();
}
} else {
if (LOG.isTraceEnabled()) {
@@ -186,18 +186,44 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
break;
}
}
- try {
- if (stats) {
- executor.pendingTasks.decrement();
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Worker #{} running: {}", number, polled);
- }
- polled.run();
- } catch (Exception t) {
- LOG.warn("Error executing reactive work due to {}. This exception is ignored.",
- t.getMessage(), t);
+ doRun(polled);
+ }
+ }
+
+ private void doRun(Runnable polled) {
+ try {
+ decrementPendingTasks();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Worker #{} running: {}", number, polled);
}
+ polled.run();
+ } catch (Exception t) {
+ LOG.warn("Error executing reactive work due to {}. This exception is ignored.",
+ t.getMessage(), t);
+ }
+ }
+
+ private void decrementRunningWorkers() {
+ if (stats) {
+ executor.runningWorkers.decrement();
+ }
+ }
+
+ private void incrementRunningWorkers() {
+ if (stats) {
+ executor.runningWorkers.increment();
+ }
+ }
+
+ private void incrementPendingTasks() {
+ if (stats) {
+ executor.pendingTasks.increment();
+ }
+ }
+
+ private void decrementPendingTasks() {
+ if (stats) {
+ executor.pendingTasks.decrement();
}
}
@@ -207,9 +233,7 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE
return false;
}
try {
- if (stats) {
- executor.pendingTasks.decrement();
- }
+ decrementPendingTasks();
if (LOG.isTraceEnabled()) {
LOG.trace("Running: {}", polled);
}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java
index f7513cb65f2..4c0919f7b7e 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultStreamCachingStrategy.java
@@ -260,21 +260,7 @@ public class DefaultStreamCachingStrategy extends ServiceSupport implements Came
if (body != null) {
boolean allowed = allowClasses == null && denyClasses == null;
if (!allowed) {
- Class<?> source = body.getClass();
- if (denyClasses != null && allowClasses != null) {
- // deny takes precedence
- allowed = !isAssignableFrom(source, denyClasses);
- if (allowed) {
- allowed = isAssignableFrom(source, allowClasses);
- }
- } else if (denyClasses != null) {
- allowed = !isAssignableFrom(source, denyClasses);
- } else {
- allowed = isAssignableFrom(source, allowClasses);
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Cache stream from class: {} is {}", source, allowed ? "allowed" : "denied");
- }
+ allowed = checkAllowDenyList(body);
}
if (allowed) {
cache = camelContext.getTypeConverter().convertTo(StreamCache.class, message.getExchange(), body);
@@ -285,20 +271,44 @@ public class DefaultStreamCachingStrategy extends ServiceSupport implements Came
LOG.trace("Cached stream to {} -> {}", cache.inMemory() ? "memory" : "spool", cache);
}
if (statistics.isStatisticsEnabled()) {
- try {
- if (cache.inMemory()) {
- statistics.updateMemory(cache.length());
- } else {
- statistics.updateSpool(cache.length());
- }
- } catch (Exception e) {
- LOG.debug("Error updating cache statistics. This exception is ignored.", e);
- }
+ computeStatistics(cache);
}
}
return cache;
}
+ private boolean checkAllowDenyList(Object body) {
+ boolean allowed;
+ Class<?> source = body.getClass();
+ if (denyClasses != null && allowClasses != null) {
+ // deny takes precedence
+ allowed = !isAssignableFrom(source, denyClasses);
+ if (allowed) {
+ allowed = isAssignableFrom(source, allowClasses);
+ }
+ } else if (denyClasses != null) {
+ allowed = !isAssignableFrom(source, denyClasses);
+ } else {
+ allowed = isAssignableFrom(source, allowClasses);
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Cache stream from class: {} is {}", source, allowed ? "allowed" : "denied");
+ }
+ return allowed;
+ }
+
+ private void computeStatistics(StreamCache cache) {
+ try {
+ if (cache.inMemory()) {
+ statistics.updateMemory(cache.length());
+ } else {
+ statistics.updateSpool(cache.length());
+ }
+ } catch (Exception e) {
+ LOG.debug("Error updating cache statistics. This exception is ignored.", e);
+ }
+ }
+
protected static boolean isAssignableFrom(Class<?> source, Collection<Class<?>> targets) {
for (Class<?> target : targets) {
if (target.isAssignableFrom(source)) {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
index f8b3de187e9..d64b2c174cb 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
@@ -160,9 +160,7 @@ public class SharedCamelInternalProcessor implements SharedInternalProcessor {
states[j++] = state;
}
} catch (Exception e) {
- exchange.setException(e);
- originalCallback.done(true);
- return true;
+ return handleException(exchange, originalCallback, e);
}
}
@@ -170,63 +168,77 @@ public class SharedCamelInternalProcessor implements SharedInternalProcessor {
AsyncCallback callback = new InternalCallback(states, exchange, originalCallback, resultProcessor);
if (exchange.isTransacted()) {
- // must be synchronized for transacted exchanges
- if (LOG.isTraceEnabled()) {
- if (exchange.isTransacted()) {
- LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}",
- exchange.getExchangeId(), exchange);
- } else {
- LOG.trace("Synchronous UnitOfWork Exchange must be routed synchronously for exchangeId: {} -> {}",
- exchange.getExchangeId(), exchange);
- }
- }
- // ----------------------------------------------------------
- // CAMEL END USER - DEBUG ME HERE +++ START +++
- // ----------------------------------------------------------
- try {
- processor.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
- // ----------------------------------------------------------
- // CAMEL END USER - DEBUG ME HERE +++ END +++
- // ----------------------------------------------------------
- callback.done(true);
- return true;
+ return processTransacted(exchange, processor, callback);
} else {
- final UnitOfWork uow = exchange.getUnitOfWork();
+ return processNonTransacted(exchange, processor, callback);
+ }
+ }
- // do uow before processing and if a value is returned then the uow wants to be processed after in the same thread
- AsyncCallback async = callback;
- boolean beforeAndAfter = uow.isBeforeAfterProcess();
- if (beforeAndAfter) {
- async = uow.beforeProcess(processor, exchange, async);
- }
+ private static boolean handleException(Exchange exchange, AsyncCallback originalCallback, Exception e) {
+ exchange.setException(e);
+ originalCallback.done(true);
+ return true;
+ }
- // ----------------------------------------------------------
- // CAMEL END USER - DEBUG ME HERE +++ START +++
- // ----------------------------------------------------------
- if (LOG.isTraceEnabled()) {
- LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
- }
- boolean sync = processor.process(exchange, async);
- // ----------------------------------------------------------
- // CAMEL END USER - DEBUG ME HERE +++ END +++
- // ----------------------------------------------------------
+ private static boolean processNonTransacted(Exchange exchange, AsyncProcessor processor, AsyncCallback callback) {
+ final UnitOfWork uow = exchange.getUnitOfWork();
- // optimize to only do after uow processing if really needed
- if (beforeAndAfter) {
- // execute any after processor work (in current thread, not in the callback)
- uow.afterProcess(processor, exchange, callback, sync);
- }
+ // do uow before processing and if a value is returned then the uow wants to be processed after in the same thread
+ AsyncCallback async = callback;
+ boolean beforeAndAfter = uow.isBeforeAfterProcess();
+ if (beforeAndAfter) {
+ async = uow.beforeProcess(processor, exchange, async);
+ }
+
+ // ----------------------------------------------------------
+ // CAMEL END USER - DEBUG ME HERE +++ START +++
+ // ----------------------------------------------------------
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processing exchange for exchangeId: {} -> {}", exchange.getExchangeId(), exchange);
+ }
+ boolean sync = processor.process(exchange, async);
+ // ----------------------------------------------------------
+ // CAMEL END USER - DEBUG ME HERE +++ END +++
+ // ----------------------------------------------------------
+
+ // optimize to only do after uow processing if really needed
+ if (beforeAndAfter) {
+ // execute any after processor work (in current thread, not in the callback)
+ uow.afterProcess(processor, exchange, callback, sync);
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
+ sync ? "synchronously" : "asynchronously",
+ exchange.getExchangeId(), exchange);
+ }
+ return sync;
+ }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Exchange processed and is continued routed {} for exchangeId: {} -> {}",
- sync ? "synchronously" : "asynchronously",
+ private static boolean processTransacted(Exchange exchange, AsyncProcessor processor, AsyncCallback callback) {
+ // must be synchronized for transacted exchanges
+ if (LOG.isTraceEnabled()) {
+ if (exchange.isTransacted()) {
+ LOG.trace("Transacted Exchange must be routed synchronously for exchangeId: {} -> {}",
+ exchange.getExchangeId(), exchange);
+ } else {
+ LOG.trace("Synchronous UnitOfWork Exchange must be routed synchronously for exchangeId: {} -> {}",
exchange.getExchangeId(), exchange);
}
- return sync;
}
+ // ----------------------------------------------------------
+ // CAMEL END USER - DEBUG ME HERE +++ START +++
+ // ----------------------------------------------------------
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ // ----------------------------------------------------------
+ // CAMEL END USER - DEBUG ME HERE +++ END +++
+ // ----------------------------------------------------------
+ callback.done(true);
+ return true;
}
/**
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java
index 63fba73c035..d9ba7c9e0b2 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/StreamCachingHelper.java
@@ -45,12 +45,7 @@ final class StreamCachingHelper {
return sc;
}
} catch (Exception e) {
- // lets allow Camels error handler to deal with stream cache failures
- StreamCacheException tce = new StreamCacheException(null, e);
- exchange.setException(tce);
- // because this is stream caching error then we cannot use redelivery as the message body is corrupt
- // so mark as redelivery exhausted
- exchange.getExchangeExtension().setRedeliveryExhausted(true);
+ handleException(exchange, null, e);
}
// check if we somewhere failed due to a stream caching exception
Throwable cause = exchange.getException();
@@ -76,15 +71,23 @@ final class StreamCachingHelper {
}
return sc;
} catch (Exception e) {
- // lets allow Camels error handler to deal with stream cache failures
- StreamCacheException tce = new StreamCacheException(exchange.getMessage().getBody(), e);
- exchange.setException(tce);
- // because this is stream caching error then we cannot use redelivery as the message body is corrupt
- // so mark as redelivery exhausted
- exchange.getExchangeExtension().setRedeliveryExhausted(true);
+ handleException(exchange, e);
}
}
return null;
}
+ private static void handleException(Exchange exchange, Exception e) {
+ handleException(exchange, exchange.getMessage().getBody(), e);
+ }
+
+ private static void handleException(Exchange exchange, Object value, Exception e) {
+ // lets allow Camels error handler to deal with stream cache failures
+ StreamCacheException tce = new StreamCacheException(value, e);
+ exchange.setException(tce);
+ // because this is stream caching error then we cannot use redelivery as the message body is corrupt
+ // so mark as redelivery exhausted
+ exchange.getExchangeExtension().setRedeliveryExhausted(true);
+ }
+
}
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 2a41f121901..a35f89616cb 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -428,26 +428,11 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
run = false;
}
if (!run) {
- LOG.trace("Run not allowed, will reject executing exchange: {}", exchange);
- if (exchange.getException() == null) {
- exchange.setException(new RejectedExecutionException());
- }
- AsyncCallback cb = callback;
- taskFactory.release(this);
- cb.done(false);
+ runNotAllowed();
return;
}
if (exchange.getExchangeExtension().isInterrupted()) {
- // mark the exchange to stop continue routing when interrupted
- // as we do not want to continue routing (for example a task has been cancelled)
- if (LOG.isTraceEnabled()) {
- LOG.trace("Is exchangeId: {} interrupted? true", exchange.getExchangeId());
- }
- exchange.setRouteStop(true);
- // we should not continue routing so call callback
- AsyncCallback cb = callback;
- taskFactory.release(this);
- cb.done(false);
+ runInterrupted();
return;
}
@@ -462,13 +447,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
if (failure || bridge) {
// previous processing cause an exception
- handleException();
- onExceptionOccurred();
- prepareExchangeAfterFailure(exchange);
- // we do not support redelivery so continue callback
- AsyncCallback cb = callback;
- taskFactory.release(this);
- reactiveExecutor.schedule(cb);
+ handlePreviousFailure();
} else if (first) {
// first time call the target processor
first = false;
@@ -481,6 +460,39 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
}
}
+ private void handlePreviousFailure() {
+ handleException();
+ onExceptionOccurred();
+ prepareExchangeAfterFailure(exchange);
+ // we do not support redelivery so continue callback
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ reactiveExecutor.schedule(cb);
+ }
+
+ private void runInterrupted() {
+ // mark the exchange to stop continue routing when interrupted
+ // as we do not want to continue routing (for example a task has been cancelled)
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Is exchangeId: {} interrupted? true", exchange.getExchangeId());
+ }
+ exchange.setRouteStop(true);
+ // we should not continue routing so call callback
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ cb.done(false);
+ }
+
+ private void runNotAllowed() {
+ LOG.trace("Run not allowed, will reject executing exchange: {}", exchange);
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException());
+ }
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ cb.done(false);
+ }
+
protected void handleException() {
Exception e = exchange.getException();
// e is never null
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java b/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
index 17fd88f0508..1a3dacc4901 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/MainDurationEventNotifier.java
@@ -79,17 +79,8 @@ public class MainDurationEventNotifier extends EventNotifierSupport {
return;
}
- final boolean reloaded = event.getType() == CamelEvent.Type.RouteReloaded;
-
- if (reloaded) {
- if (restartDuration) {
- LOG.debug("Routes reloaded. Resetting maxMessages/maxIdleSeconds/maxSeconds");
- shutdownStrategy.restartAwait();
- doneMessages.reset();
- if (watch != null) {
- watch.restart();
- }
- }
+ if (event.getType() == CamelEvent.Type.RouteReloaded) {
+ resetOnReload();
return;
}
@@ -108,30 +99,49 @@ public class MainDurationEventNotifier extends EventNotifierSupport {
}
if (result && shutdownStrategy.isRunAllowed()) {
- if ("shutdown".equalsIgnoreCase(action)) {
- LOG.info("Duration max messages triggering shutdown of the JVM");
- // use thread to shut down Camel as otherwise we would block current thread
- camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::shutdownTask)
- .start();
- } else if ("stop".equalsIgnoreCase(action)) {
- LOG.info("Duration max messages triggering stopping all routes");
- // use thread to stop routes as otherwise we would block current thread
- camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::stopTask)
- .start();
- }
+ triggerDoneEvent();
}
}
}
// idle reacts on both incoming and complete messages
if (maxIdleSeconds > 0) {
- final boolean begin = event.getType() == CamelEvent.Type.ExchangeCreated;
+ resetOnActivity(event, complete);
+ }
+ }
- if (begin || complete) {
- if (watch != null) {
- LOG.trace("Message activity so restarting stop watch");
- watch.restart();
- }
+ private void triggerDoneEvent() {
+ if ("shutdown".equalsIgnoreCase(action)) {
+ LOG.info("Duration max messages triggering shutdown of the JVM");
+ // use thread to shut down Camel as otherwise we would block current thread
+ camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::shutdownTask)
+ .start();
+ } else if ("stop".equalsIgnoreCase(action)) {
+ LOG.info("Duration max messages triggering stopping all routes");
+ // use thread to stop routes as otherwise we would block current thread
+ camelContext.getExecutorServiceManager().newThread("CamelMainShutdownCamelContext", this::stopTask)
+ .start();
+ }
+ }
+
+ private void resetOnActivity(CamelEvent event, boolean complete) {
+ final boolean created = event.getType() == CamelEvent.Type.ExchangeCreated;
+
+ if (created || complete) {
+ if (watch != null) {
+ LOG.trace("Message activity so restarting stop watch");
+ watch.restart();
+ }
+ }
+ }
+
+ private void resetOnReload() {
+ if (restartDuration) {
+ LOG.debug("Routes reloaded. Resetting maxMessages/maxIdleSeconds/maxSeconds");
+ shutdownStrategy.restartAwait();
+ doneMessages.reset();
+ if (watch != null) {
+ watch.restart();
}
}
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 32cd22eec71..0a2da86ec14 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -34,6 +34,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeExtension;
import org.apache.camel.ExchangePattern;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Message;
@@ -343,58 +344,74 @@ public final class ExchangeHelper {
private static void doCopyResults(Exchange result, Exchange source, boolean preserverPattern) {
if (result == source) {
- // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
- // and the result is not failed
- if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
- // copy IN to OUT as we expect a OUT response
- result.getOut().copyFrom(source.getIn());
- }
+ copyFromOutMessageConditionally(result, source);
return;
}
if (source.hasOut()) {
- if (preserverPattern) {
- // exchange pattern sensitive
- Message resultMessage = getResultMessage(result);
- resultMessage.copyFrom(source.getOut());
- } else {
- result.getOut().copyFrom(source.getOut());
- }
+ copyFromOutMessage(result, source, preserverPattern);
} else {
- // no results so lets copy the last input
- // as the final processor on a pipeline might not
- // have created any OUT; such as a mock:endpoint
- // so lets assume the last IN is the OUT
- if (!preserverPattern && result.getPattern().isOutCapable()) {
- // only set OUT if its OUT capable
- result.getOut().copyFrom(source.getIn());
- } else {
- // if not replace IN instead to keep the MEP
- result.getIn().copyFrom(source.getIn());
- // clear any existing OUT as the result is on the IN
- if (result.hasOut()) {
- result.setOut(null);
- }
- }
+ copyFromInMessage(result, source, preserverPattern);
}
if (source.hasProperties()) {
result.getProperties().putAll(source.getProperties());
}
- source.getExchangeExtension().copyInternalProperties(result);
- source.getExchangeExtension().copySafeCopyPropertiesTo(result.getExchangeExtension());
+
+ final ExchangeExtension sourceExtension = source.getExchangeExtension();
+ sourceExtension.copyInternalProperties(result);
+
+ final ExchangeExtension resultExtension = result.getExchangeExtension();
+ sourceExtension.copySafeCopyPropertiesTo(resultExtension);
// copy over state
result.setRouteStop(source.isRouteStop());
result.setRollbackOnly(source.isRollbackOnly());
result.setRollbackOnlyLast(source.isRollbackOnlyLast());
- result.getExchangeExtension().setNotifyEvent(source.getExchangeExtension().isNotifyEvent());
- result.getExchangeExtension().setRedeliveryExhausted(source.getExchangeExtension().isRedeliveryExhausted());
- result.getExchangeExtension().setErrorHandlerHandled(source.getExchangeExtension().getErrorHandlerHandled());
+ resultExtension.setNotifyEvent(sourceExtension.isNotifyEvent());
+ resultExtension.setRedeliveryExhausted(sourceExtension.isRedeliveryExhausted());
+ resultExtension.setErrorHandlerHandled(sourceExtension.getErrorHandlerHandled());
result.setException(source.getException());
}
+ private static void copyFromOutMessageConditionally(Exchange result, Exchange source) {
+ // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
+ // and the result is not failed
+ if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
+ // copy IN to OUT as we expect a OUT response
+ result.getOut().copyFrom(source.getIn());
+ }
+ }
+
+ private static void copyFromInMessage(Exchange result, Exchange source, boolean preserverPattern) {
+ // no results so lets copy the last input
+ // as the final processor on a pipeline might not
+ // have created any OUT; such as a mock:endpoint
+ // so lets assume the last IN is the OUT
+ if (!preserverPattern && result.getPattern().isOutCapable()) {
+ // only set OUT if its OUT capable
+ result.getOut().copyFrom(source.getIn());
+ } else {
+ // if not replace IN instead to keep the MEP
+ result.getIn().copyFrom(source.getIn());
+ // clear any existing OUT as the result is on the IN
+ if (result.hasOut()) {
+ result.setOut(null);
+ }
+ }
+ }
+
+ private static void copyFromOutMessage(Exchange result, Exchange source, boolean preserverPattern) {
+ if (preserverPattern) {
+ // exchange pattern sensitive
+ Message resultMessage = getResultMessage(result);
+ resultMessage.copyFrom(source.getOut());
+ } else {
+ result.getOut().copyFrom(source.getOut());
+ }
+ }
+
/**
* Returns the message where to write results in an exchange-pattern-sensitive way.
*
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java b/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
index 97caf1aa730..f1813d0b32b 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
@@ -217,12 +217,7 @@ public abstract class MessageSupport implements Message, CamelContextAware, Data
// the headers may be the same instance if the end user has made some mistake
// and set the OUT message with the same header instance of the IN message etc
- boolean sameHeadersInstance = false;
- if (hasHeaders() && that.hasHeaders() && getHeaders() == that.getHeaders()) {
- sameHeadersInstance = true;
- }
-
- if (!sameHeadersInstance) {
+ if (!sameHeaders(that)) {
if (hasHeaders()) {
// okay its safe to clear the headers
getHeaders().clear();
@@ -233,6 +228,10 @@ public abstract class MessageSupport implements Message, CamelContextAware, Data
}
}
+ private boolean sameHeaders(Message that) {
+ return hasHeaders() && that.hasHeaders() && getHeaders() == that.getHeaders();
+ }
+
@Override
public Exchange getExchange() {
return exchange;
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
index 5450dddfcd5..e3b32b56a20 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
@@ -64,12 +64,7 @@ public final class UnitOfWorkHelper {
if (synchronizations.size() > 1) {
// work on a copy of the list to avoid any modification which may cause ConcurrentModificationException
- List<Synchronization> copy = new ArrayList<>(synchronizations);
-
- // reverse so we invoke it FILO style instead of FIFO
- Collections.reverse(copy);
- // and honor if any was ordered by sorting it accordingly
- copy.sort(OrderedComparator.get());
+ final List<Synchronization> copy = safeCopy(synchronizations);
boolean failed = exchange.isFailed();
@@ -101,6 +96,13 @@ public final class UnitOfWorkHelper {
public static void beforeRouteSynchronizations(
Route route, Exchange exchange, List<Synchronization> synchronizations, Logger log) {
// work on a copy of the list to avoid any modification which may cause ConcurrentModificationException
+ final List<Synchronization> copy = safeCopy(synchronizations);
+
+ // invoke synchronization callbacks
+ invokeSynchronizationCallbacks(route, exchange, log, copy);
+ }
+
+ private static List<Synchronization> safeCopy(List<Synchronization> synchronizations) {
List<Synchronization> copy = new ArrayList<>(synchronizations);
if (copy.size() > 1) {
@@ -109,8 +111,10 @@ public final class UnitOfWorkHelper {
// and honor if any was ordered by sorting it accordingly
copy.sort(OrderedComparator.get());
}
+ return copy;
+ }
- // invoke synchronization callbacks
+ private static void invokeSynchronizationCallbacks(Route route, Exchange exchange, Logger log, List<Synchronization> copy) {
for (Synchronization synchronization : copy) {
final SynchronizationRouteAware routeSynchronization = synchronization.getRouteSynchronization();
if (routeSynchronization != null) {
@@ -128,14 +132,7 @@ public final class UnitOfWorkHelper {
public static void afterRouteSynchronizations(
Route route, Exchange exchange, List<Synchronization> synchronizations, Logger log) {
// work on a copy of the list to avoid any modification which may cause ConcurrentModificationException
- List<Synchronization> copy = new ArrayList<>(synchronizations);
-
- if (copy.size() > 1) {
- // reverse so we invoke it FILO style instead of FIFO
- Collections.reverse(copy);
- // and honor if any was ordered by sorting it accordingly
- copy.sort(OrderedComparator.get());
- }
+ final List<Synchronization> copy = safeCopy(synchronizations);
// invoke synchronization callbacks
for (Synchronization synchronization : copy) {