You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/10/19 05:19:22 UTC
[camel] branch main updated: (chores) camel-disruptor: reduce method size
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 7978344cc84 (chores) camel-disruptor: reduce method size
7978344cc84 is described below
commit 7978344cc8417cf6e5116b3a4527338212dfcd80
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Oct 18 16:45:22 2023 +0200
(chores) camel-disruptor: reduce method size
This should help profilers to provide better reports and help JVM to inline methods when possible
---
.../component/disruptor/DisruptorConsumer.java | 41 ++++++-----
.../component/disruptor/DisruptorProducer.java | 79 +++++++++++-----------
2 files changed, 65 insertions(+), 55 deletions(-)
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
index 1a8b1128b86..601789d2593 100644
--- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
@@ -162,31 +162,38 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe
// (see org.apache.camel.processor.CamelInternalProcessor.InternalCallback#done).
// To solve this problem, a new synchronization is set on the exchange that is to be
// processed
- result.getExchangeExtension().addOnCompletion(new Synchronization() {
- @Override
- public void onComplete(Exchange exchange) {
- synchronizedExchange.consumed(result);
- }
-
- @Override
- public void onFailure(Exchange exchange) {
- synchronizedExchange.consumed(result);
- }
- });
+ result.getExchangeExtension().addOnCompletion(newSynchronization(synchronizedExchange, result));
// As the necessary post-processing of the exchange is done by the registered Synchronization,
// we can suffice with a no-op AsyncCallback
processor.process(result, NOOP_ASYNC_CALLBACK);
} catch (Exception e) {
- Exchange exchange = synchronizedExchange.getExchange();
+ handleException(synchronizedExchange, e);
+ }
+ }
+
+ private static Synchronization newSynchronization(SynchronizedExchange synchronizedExchange, Exchange result) {
+ return new Synchronization() {
+ @Override
+ public void onComplete(Exchange exchange) {
+ synchronizedExchange.consumed(result);
+ }
- if (exchange != null) {
- getExceptionHandler().handleException("Error processing exchange",
- exchange, e);
- } else {
- getExceptionHandler().handleException(e);
+ @Override
+ public void onFailure(Exchange exchange) {
+ synchronizedExchange.consumed(result);
}
+ };
+ }
+
+ private void handleException(SynchronizedExchange synchronizedExchange, Exception e) {
+ Exchange exchange = synchronizedExchange.getExchange();
+
+ if (exchange != null) {
+ getExceptionHandler().handleException("Error processing exchange", exchange, e);
+ } else {
+ getExceptionHandler().handleException(e);
}
}
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
index 49a41a59132..220e9b86ab6 100644
--- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
@@ -43,9 +43,10 @@ public class DisruptorProducer extends DefaultAsyncProducer {
private final DisruptorEndpoint endpoint;
private boolean blockWhenFull;
- public DisruptorProducer(final DisruptorEndpoint endpoint,
- final WaitForTaskToComplete waitForTaskToComplete,
- final long timeout, boolean blockWhenFull) {
+ public DisruptorProducer(
+ final DisruptorEndpoint endpoint,
+ final WaitForTaskToComplete waitForTaskToComplete,
+ final long timeout, boolean blockWhenFull) {
super(endpoint);
this.waitForTaskToComplete = waitForTaskToComplete;
this.timeout = timeout;
@@ -85,41 +86,7 @@ public class DisruptorProducer extends DefaultAsyncProducer {
final CountDownLatch latch = new CountDownLatch(1);
// we should wait for the reply so install a on completion so we know when its complete
- copy.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() {
- @Override
- public void onDone(final Exchange response) {
- // check for timeout, which then already would have invoked the latch
- if (latch.getCount() == 0) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("{}. Timeout occurred so response will be ignored: {}", this,
- response.getMessage());
- }
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} with response: {}", this,
- response.getMessage());
- }
- try {
- ExchangeHelper.copyResults(exchange, response);
- } finally {
- // always ensure latch is triggered
- latch.countDown();
- }
- }
- }
-
- @Override
- public boolean allowHandover() {
- // do not allow handover as we want to seda producer to have its completion triggered
- // at this point in the routing (at this leg), instead of at the very last (this ensure timeout is honored)
- return false;
- }
-
- @Override
- public String toString() {
- return "onDone at endpoint: " + endpoint;
- }
- });
+ copy.getExchangeExtension().addOnCompletion(newOnCompletion(exchange, latch));
doPublish(copy);
@@ -176,6 +143,42 @@ public class DisruptorProducer extends DefaultAsyncProducer {
return true;
}
+ private SynchronizationAdapter newOnCompletion(Exchange exchange, CountDownLatch latch) {
+ return new SynchronizationAdapter() {
+ @Override
+ public void onDone(final Exchange response) {
+ // check for timeout, which then already would have invoked the latch
+ if (latch.getCount() == 0) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{}. Timeout occurred so response will be ignored: {}", this, response.getMessage());
+ }
+ } else {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} with response: {}", this, response.getMessage());
+ }
+ try {
+ ExchangeHelper.copyResults(exchange, response);
+ } finally {
+ // always ensure latch is triggered
+ latch.countDown();
+ }
+ }
+ }
+
+ @Override
+ public boolean allowHandover() {
+ // do not allow handover as we want to seda producer to have its completion triggered
+ // at this point in the routing (at this leg), instead of at the very last (this ensure timeout is honored)
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "onDone at endpoint: " + endpoint;
+ }
+ };
+ }
+
private void doPublish(Exchange exchange) {
LOG.trace("Publishing Exchange to disruptor ringbuffer: {}", exchange);