You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/10/19 05:19:22 UTC

[camel] branch main updated: (chores) camel-disruptor: reduce method size

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 7978344cc84 (chores) camel-disruptor: reduce method size
7978344cc84 is described below

commit 7978344cc8417cf6e5116b3a4527338212dfcd80
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Wed Oct 18 16:45:22 2023 +0200

    (chores) camel-disruptor: reduce method size
    
    This should help profilers to provide better reports and help JVM to inline methods when possible
---
 .../component/disruptor/DisruptorConsumer.java     | 41 ++++++-----
 .../component/disruptor/DisruptorProducer.java     | 79 +++++++++++-----------
 2 files changed, 65 insertions(+), 55 deletions(-)

diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
index 1a8b1128b86..601789d2593 100644
--- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
@@ -162,31 +162,38 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe
             // (see org.apache.camel.processor.CamelInternalProcessor.InternalCallback#done).
             // To solve this problem, a new synchronization is set on the exchange that is to be
             // processed
-            result.getExchangeExtension().addOnCompletion(new Synchronization() {
-                @Override
-                public void onComplete(Exchange exchange) {
-                    synchronizedExchange.consumed(result);
-                }
-
-                @Override
-                public void onFailure(Exchange exchange) {
-                    synchronizedExchange.consumed(result);
-                }
-            });
+            result.getExchangeExtension().addOnCompletion(newSynchronization(synchronizedExchange, result));
 
             // As the necessary post-processing of the exchange is done by the registered Synchronization,
             // we can suffice with a no-op AsyncCallback
             processor.process(result, NOOP_ASYNC_CALLBACK);
 
         } catch (Exception e) {
-            Exchange exchange = synchronizedExchange.getExchange();
+            handleException(synchronizedExchange, e);
+        }
+    }
+
+    private static Synchronization newSynchronization(SynchronizedExchange synchronizedExchange, Exchange result) {
+        return new Synchronization() {
+            @Override
+            public void onComplete(Exchange exchange) {
+                synchronizedExchange.consumed(result);
+            }
 
-            if (exchange != null) {
-                getExceptionHandler().handleException("Error processing exchange",
-                        exchange, e);
-            } else {
-                getExceptionHandler().handleException(e);
+            @Override
+            public void onFailure(Exchange exchange) {
+                synchronizedExchange.consumed(result);
             }
+        };
+    }
+
+    private void handleException(SynchronizedExchange synchronizedExchange, Exception e) {
+        Exchange exchange = synchronizedExchange.getExchange();
+
+        if (exchange != null) {
+            getExceptionHandler().handleException("Error processing exchange", exchange, e);
+        } else {
+            getExceptionHandler().handleException(e);
         }
     }
 
diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
index 49a41a59132..220e9b86ab6 100644
--- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
+++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorProducer.java
@@ -43,9 +43,10 @@ public class DisruptorProducer extends DefaultAsyncProducer {
     private final DisruptorEndpoint endpoint;
     private boolean blockWhenFull;
 
-    public DisruptorProducer(final DisruptorEndpoint endpoint,
-                             final WaitForTaskToComplete waitForTaskToComplete,
-                             final long timeout, boolean blockWhenFull) {
+    public DisruptorProducer(
+            final DisruptorEndpoint endpoint,
+            final WaitForTaskToComplete waitForTaskToComplete,
+            final long timeout, boolean blockWhenFull) {
         super(endpoint);
         this.waitForTaskToComplete = waitForTaskToComplete;
         this.timeout = timeout;
@@ -85,41 +86,7 @@ public class DisruptorProducer extends DefaultAsyncProducer {
             final CountDownLatch latch = new CountDownLatch(1);
 
             // we should wait for the reply so install a on completion so we know when its complete
-            copy.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() {
-                @Override
-                public void onDone(final Exchange response) {
-                    // check for timeout, which then already would have invoked the latch
-                    if (latch.getCount() == 0) {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("{}. Timeout occurred so response will be ignored: {}", this,
-                                    response.getMessage());
-                        }
-                    } else {
-                        if (LOG.isTraceEnabled()) {
-                            LOG.trace("{} with response: {}", this,
-                                    response.getMessage());
-                        }
-                        try {
-                            ExchangeHelper.copyResults(exchange, response);
-                        } finally {
-                            // always ensure latch is triggered
-                            latch.countDown();
-                        }
-                    }
-                }
-
-                @Override
-                public boolean allowHandover() {
-                    // do not allow handover as we want to seda producer to have its completion triggered
-                    // at this point in the routing (at this leg), instead of at the very last (this ensure timeout is honored)
-                    return false;
-                }
-
-                @Override
-                public String toString() {
-                    return "onDone at endpoint: " + endpoint;
-                }
-            });
+            copy.getExchangeExtension().addOnCompletion(newOnCompletion(exchange, latch));
 
             doPublish(copy);
 
@@ -176,6 +143,42 @@ public class DisruptorProducer extends DefaultAsyncProducer {
         return true;
     }
 
+    private SynchronizationAdapter newOnCompletion(Exchange exchange, CountDownLatch latch) {
+        return new SynchronizationAdapter() {
+            @Override
+            public void onDone(final Exchange response) {
+                // check for timeout, which then already would have invoked the latch
+                if (latch.getCount() == 0) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("{}. Timeout occurred so response will be ignored: {}", this, response.getMessage());
+                    }
+                } else {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("{} with response: {}", this, response.getMessage());
+                    }
+                    try {
+                        ExchangeHelper.copyResults(exchange, response);
+                    } finally {
+                        // always ensure latch is triggered
+                        latch.countDown();
+                    }
+                }
+            }
+
+            @Override
+            public boolean allowHandover() {
+                // do not allow handover as we want to seda producer to have its completion triggered
+                // at this point in the routing (at this leg), instead of at the very last (this ensure timeout is honored)
+                return false;
+            }
+
+            @Override
+            public String toString() {
+                return "onDone at endpoint: " + endpoint;
+            }
+        };
+    }
+
     private void doPublish(Exchange exchange) {
         LOG.trace("Publishing Exchange to disruptor ringbuffer: {}", exchange);