You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/04/02 12:50:07 UTC

[camel] 03/09: CAMEL-15105: make the AsyncProcessorAwaitManager a plugin of the context

This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 501498d9c8620dd825f94bb177ff63719f2d18d9
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Sat Apr 1 15:58:05 2023 +0200

    CAMEL-15105: make the AsyncProcessorAwaitManager a plugin of the context
---
 .../java/org/apache/camel/ExtendedCamelContext.java | 16 ----------------
 .../camel/impl/engine/AbstractCamelContext.java     |  8 ++------
 .../impl/engine/DefaultCamelContextExtension.java   | 19 -------------------
 .../impl/engine/SharedCamelInternalProcessor.java   |  3 ++-
 .../apache/camel/impl/console/BlockedConsole.java   |  5 +++--
 .../camel/impl/ExtendedCamelContextConfigurer.java  |  6 ------
 .../impl/lw/LightweightCamelContextExtension.java   | 13 -------------
 .../errorhandler/RedeliveryErrorHandler.java        |  2 +-
 .../core/xml/AbstractCamelContextFactoryBean.java   |  2 +-
 .../processor/async/AsyncEndpointPolicyTest.java    |  3 ++-
 .../AsyncProcessorAwaitManagerInterruptTest.java    | 21 ++++++++++++---------
 ...ssorAwaitManagerInterruptWithRedeliveryTest.java | 19 +++++++++++--------
 .../async/AsyncProcessorAwaitManagerTest.java       | 16 +++++++++-------
 .../camel/main/DefaultConfigurationConfigurer.java  |  2 +-
 .../apache/camel/support/AsyncProcessorHelper.java  |  2 +-
 .../apache/camel/support/AsyncProcessorSupport.java |  2 +-
 .../apache/camel/support/DefaultAsyncProducer.java  |  2 +-
 .../java/org/apache/camel/support/PluginHelper.java | 20 +++++++++++++++++++-
 18 files changed, 66 insertions(+), 95 deletions(-)

diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index e8346448418..16dbcc3c229 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -23,7 +23,6 @@ import java.util.function.Supplier;
 
 import org.apache.camel.catalog.RuntimeCamelCatalog;
 import org.apache.camel.spi.AnnotationBasedProcessorFactory;
-import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.BeanIntrospection;
 import org.apache.camel.spi.BeanProcessorFactory;
 import org.apache.camel.spi.BeanProxyFactory;
@@ -49,7 +48,6 @@ import org.apache.camel.spi.ResourceLoader;
 import org.apache.camel.spi.RestBindingJaxbDataFormatFactory;
 import org.apache.camel.spi.RouteController;
 import org.apache.camel.spi.RouteStartupOrder;
-import org.apache.camel.spi.RoutesLoader;
 import org.apache.camel.spi.StartupStepRecorder;
 import org.apache.camel.spi.UnitOfWorkFactory;
 
@@ -384,20 +382,6 @@ public interface ExtendedCamelContext {
      */
     void addLogListener(LogListener listener);
 
-    /**
-     * Gets the {@link org.apache.camel.AsyncProcessor} await manager.
-     *
-     * @return the manager
-     */
-    AsyncProcessorAwaitManager getAsyncProcessorAwaitManager();
-
-    /**
-     * Sets a custom {@link org.apache.camel.AsyncProcessor} await manager.
-     *
-     * @param manager the manager
-     */
-    void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager manager);
-
     /**
      * Gets the {@link BeanIntrospection}
      */
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index a177273fd49..36aecfecdc5 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -221,7 +221,6 @@ public abstract class AbstractCamelContext extends BaseService
     volatile ModelToXMLDumper modelToXMLDumper;
     volatile RestBindingJaxbDataFormatFactory restBindingJaxbDataFormatFactory;
     volatile RuntimeCamelCatalog runtimeCamelCatalog;
-    volatile AsyncProcessorAwaitManager asyncProcessorAwaitManager;
     volatile UnitOfWorkFactory unitOfWorkFactory;
     volatile BeanIntrospection beanIntrospection;
     volatile boolean eventNotificationApplicable;
@@ -381,6 +380,7 @@ public abstract class AbstractCamelContext extends BaseService
         camelContextExtension.lazyAddContextPlugin(InterceptEndpointFactory.class, this::createInterceptEndpointFactory);
         camelContextExtension.lazyAddContextPlugin(RouteFactory.class, this::createRouteFactory);
         camelContextExtension.lazyAddContextPlugin(RoutesLoader.class, this::createRoutesLoader);
+        camelContextExtension.lazyAddContextPlugin(AsyncProcessorAwaitManager.class, this::createAsyncProcessorAwaitManager);
 
         if (build) {
             try {
@@ -2878,6 +2878,7 @@ public abstract class AbstractCamelContext extends BaseService
 
         // shutdown await manager to trigger interrupt of blocked threads to
         // attempt to free these threads graceful
+        final AsyncProcessorAwaitManager asyncProcessorAwaitManager = PluginHelper.getAsyncProcessorAwaitManager(this);
         InternalServiceManager.shutdownServices(this, asyncProcessorAwaitManager);
 
         // we need also to include routes which failed to start to ensure all resources get stopped when stopping Camel
@@ -3286,7 +3287,6 @@ public abstract class AbstractCamelContext extends BaseService
         typeConverterRegistry = null;
         typeConverter = null;
         reactiveExecutor = null;
-        asyncProcessorAwaitManager = null;
         exchangeFactory = null;
         exchangeFactoryManager = null;
         processorExchangeFactory = null;
@@ -4171,10 +4171,6 @@ public abstract class AbstractCamelContext extends BaseService
         camelContextExtension.addInterceptStrategy(interceptStrategy);
     }
 
-    public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() {
-        return camelContextExtension.getAsyncProcessorAwaitManager();
-    }
-
     public BeanIntrospection getBeanIntrospection() {
         return camelContextExtension.getBeanIntrospection();
     }
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
index b981927d3cb..dc0532c2898 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
@@ -37,7 +37,6 @@ import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.Service;
 import org.apache.camel.catalog.RuntimeCamelCatalog;
 import org.apache.camel.spi.AnnotationBasedProcessorFactory;
-import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.BeanIntrospection;
 import org.apache.camel.spi.BeanProcessorFactory;
 import org.apache.camel.spi.BeanProxyFactory;
@@ -407,24 +406,6 @@ class DefaultCamelContextExtension implements ExtendedCamelContext {
         }
     }
 
-    @Override
-    public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() {
-        if (camelContext.asyncProcessorAwaitManager == null) {
-            synchronized (camelContext.lock) {
-                if (camelContext.asyncProcessorAwaitManager == null) {
-                    setAsyncProcessorAwaitManager(camelContext.createAsyncProcessorAwaitManager());
-                }
-            }
-        }
-        return camelContext.asyncProcessorAwaitManager;
-    }
-
-    @Override
-    public void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager asyncProcessorAwaitManager) {
-        camelContext.asyncProcessorAwaitManager
-                = camelContext.getInternalServiceManager().addService(asyncProcessorAwaitManager);
-    }
-
     @Override
     public BeanIntrospection getBeanIntrospection() {
         if (camelContext.beanIntrospection == null) {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
index 6ae6ed90103..28f89571d46 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
@@ -37,6 +37,7 @@ import org.apache.camel.spi.Transformer;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.support.OrderedComparator;
+import org.apache.camel.support.PluginHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,7 +83,7 @@ public class SharedCamelInternalProcessor implements SharedInternalProcessor {
     public SharedCamelInternalProcessor(CamelContext camelContext, CamelInternalProcessorAdvice... advices) {
         this.camelContext = camelContext;
         this.reactiveExecutor = camelContext.getCamelContextExtension().getReactiveExecutor();
-        this.awaitManager = camelContext.getCamelContextExtension().getAsyncProcessorAwaitManager();
+        this.awaitManager = PluginHelper.getAsyncProcessorAwaitManager(camelContext);
         this.shutdownStrategy = camelContext.getShutdownStrategy();
 
         if (advices != null) {
diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/BlockedConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/BlockedConsole.java
index 339f216f558..8c897dc9b93 100644
--- a/core/camel-console/src/main/java/org/apache/camel/impl/console/BlockedConsole.java
+++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/BlockedConsole.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.annotations.DevConsole;
+import org.apache.camel.support.PluginHelper;
 import org.apache.camel.support.console.AbstractDevConsole;
 import org.apache.camel.util.TimeUtils;
 import org.apache.camel.util.json.JsonObject;
@@ -37,7 +38,7 @@ public class BlockedConsole extends AbstractDevConsole {
     protected String doCallText(Map<String, Object> options) {
         StringBuilder sb = new StringBuilder();
 
-        AsyncProcessorAwaitManager am = getCamelContext().getCamelContextExtension().getAsyncProcessorAwaitManager();
+        AsyncProcessorAwaitManager am = PluginHelper.getAsyncProcessorAwaitManager(getCamelContext());
         sb.append(String.format("\n    Blocked: %s", am.size()));
         for (AsyncProcessorAwaitManager.AwaitThread at : am.browse()) {
             String age = TimeUtils.printDuration(at.getWaitDuration(), true);
@@ -52,7 +53,7 @@ public class BlockedConsole extends AbstractDevConsole {
     protected JsonObject doCallJson(Map<String, Object> options) {
         JsonObject root = new JsonObject();
 
-        AsyncProcessorAwaitManager am = getCamelContext().getCamelContextExtension().getAsyncProcessorAwaitManager();
+        AsyncProcessorAwaitManager am = PluginHelper.getAsyncProcessorAwaitManager(getCamelContext());
         root.put("blocked", am.size());
 
         final List<JsonObject> list = new ArrayList<>();
diff --git a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
index 77230c6cdf2..5866067408b 100644
--- a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
+++ b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
@@ -23,8 +23,6 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "annotationbasedprocessorfactory":
         case "AnnotationBasedProcessorFactory": target.setAnnotationBasedProcessorFactory(property(camelContext, org.apache.camel.spi.AnnotationBasedProcessorFactory.class, value)); return true;
-        case "asyncprocessorawaitmanager":
-        case "AsyncProcessorAwaitManager": target.setAsyncProcessorAwaitManager(property(camelContext, org.apache.camel.spi.AsyncProcessorAwaitManager.class, value)); return true;
         case "basepackagescan":
         case "BasePackageScan": target.setBasePackageScan(property(camelContext, java.lang.String.class, value)); return true;
         case "beanintrospection":
@@ -76,8 +74,6 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "annotationbasedprocessorfactory":
         case "AnnotationBasedProcessorFactory": return org.apache.camel.spi.AnnotationBasedProcessorFactory.class;
-        case "asyncprocessorawaitmanager":
-        case "AsyncProcessorAwaitManager": return org.apache.camel.spi.AsyncProcessorAwaitManager.class;
         case "basepackagescan":
         case "BasePackageScan": return java.lang.String.class;
         case "beanintrospection":
@@ -130,8 +126,6 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
         switch (ignoreCase ? name.toLowerCase() : name) {
         case "annotationbasedprocessorfactory":
         case "AnnotationBasedProcessorFactory": return target.getAnnotationBasedProcessorFactory();
-        case "asyncprocessorawaitmanager":
-        case "AsyncProcessorAwaitManager": return target.getAsyncProcessorAwaitManager();
         case "basepackagescan":
         case "BasePackageScan": return target.getBasePackageScan();
         case "beanintrospection":
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java
index 82469a20b83..1878d61c495 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java
@@ -38,7 +38,6 @@ import org.apache.camel.Service;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.catalog.RuntimeCamelCatalog;
 import org.apache.camel.spi.AnnotationBasedProcessorFactory;
-import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.BeanIntrospection;
 import org.apache.camel.spi.BeanProcessorFactory;
 import org.apache.camel.spi.BeanProxyFactory;
@@ -63,9 +62,7 @@ import org.apache.camel.spi.Registry;
 import org.apache.camel.spi.ResourceLoader;
 import org.apache.camel.spi.RestBindingJaxbDataFormatFactory;
 import org.apache.camel.spi.RouteController;
-import org.apache.camel.spi.RouteFactory;
 import org.apache.camel.spi.RouteStartupOrder;
-import org.apache.camel.spi.RoutesLoader;
 import org.apache.camel.spi.StartupStepRecorder;
 import org.apache.camel.spi.SupervisingRouteController;
 import org.apache.camel.spi.UnitOfWorkFactory;
@@ -290,16 +287,6 @@ class LightweightCamelContextExtension implements ExtendedCamelContext {
         throw new UnsupportedOperationException();
     }
 
-    @Override
-    public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() {
-        return camelContext.getCamelContextExtension().getAsyncProcessorAwaitManager();
-    }
-
-    @Override
-    public void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager manager) {
-        throw new UnsupportedOperationException();
-    }
-
     @Override
     public BeanIntrospection getBeanIntrospection() {
         return camelContext.getCamelContextExtension().getBeanIntrospection();
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 2e8eba75bd7..688b7cf1cd6 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -118,7 +118,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
 
         this.camelContext = camelContext;
         this.reactiveExecutor = camelContext.getCamelContextExtension().getReactiveExecutor();
-        this.awaitManager = camelContext.getCamelContextExtension().getAsyncProcessorAwaitManager();
+        this.awaitManager = PluginHelper.getAsyncProcessorAwaitManager(camelContext);
         this.shutdownStrategy = camelContext.getShutdownStrategy();
         this.redeliveryProcessor = redeliveryProcessor;
         this.deadLetter = deadLetter;
diff --git a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
index faa44550efd..ea03b928510 100644
--- a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
+++ b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
@@ -267,7 +267,7 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
         AsyncProcessorAwaitManager asyncProcessorAwaitManager = getBeanForType(AsyncProcessorAwaitManager.class);
         if (asyncProcessorAwaitManager != null) {
             LOG.info("Using custom AsyncProcessorAwaitManager: {}", asyncProcessorAwaitManager);
-            getContext().getCamelContextExtension().setAsyncProcessorAwaitManager(asyncProcessorAwaitManager);
+            getContext().getCamelContextExtension().addContextPlugin(AsyncProcessorAwaitManager.class, asyncProcessorAwaitManager);
         }
         ManagementStrategy managementStrategy = getBeanForType(ManagementStrategy.class);
         if (managementStrategy != null) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
index 9522bbc165c..7c3912d4b62 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
@@ -31,6 +31,7 @@ import org.apache.camel.spi.Policy;
 import org.apache.camel.spi.Registry;
 import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
+import org.apache.camel.support.PluginHelper;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -125,7 +126,7 @@ public class AsyncEndpointPolicyTest extends ContextTestSupport {
 
                 public void process(Exchange exchange) throws Exception {
                     final AsyncProcessorAwaitManager awaitManager
-                            = exchange.getContext().getCamelContextExtension().getAsyncProcessorAwaitManager();
+                            = PluginHelper.getAsyncProcessorAwaitManager(exchange.getContext());
                     awaitManager.process(this, exchange);
                 }
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
index ec8383aaad5..51db2d95b5a 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
@@ -25,6 +25,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.support.PluginHelper;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -34,10 +35,10 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport
 
     @Test
     public void testAsyncAwaitInterrupt() throws Exception {
-        context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true);
-
-        assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size());
+        final AsyncProcessorAwaitManager asyncProcessorAwaitManager = PluginHelper.getAsyncProcessorAwaitManager(context);
+        asyncProcessorAwaitManager.getStatistics().setStatisticsEnabled(true);
 
+        assertEquals(0, asyncProcessorAwaitManager.size());
         getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
         getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
         getMockEndpoint("mock:result").expectedMessageCount(0);
@@ -51,10 +52,10 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport
 
         assertMockEndpointsSatisfied();
 
-        assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size());
+        assertEquals(0, asyncProcessorAwaitManager.size());
         assertEquals(1,
-                context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked());
-        assertEquals(1, context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics()
+                asyncProcessorAwaitManager.getStatistics().getThreadsBlocked());
+        assertEquals(1, asyncProcessorAwaitManager.getStatistics()
                 .getThreadsInterrupted());
     }
 
@@ -69,17 +70,19 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport
                         .to("mock:after").process(new Processor() {
                             @Override
                             public void process(Exchange exchange) throws Exception {
-                                int size = context.getCamelContextExtension().getAsyncProcessorAwaitManager().size();
+                                final AsyncProcessorAwaitManager asyncProcessorAwaitManager
+                                        = PluginHelper.getAsyncProcessorAwaitManager(context);
+                                int size = asyncProcessorAwaitManager.size();
                                 log.info("async inflight: {}", size);
                                 assertEquals(1, size);
 
                                 Collection<AsyncProcessorAwaitManager.AwaitThread> threads
-                                        = context.getCamelContextExtension().getAsyncProcessorAwaitManager().browse();
+                                        = asyncProcessorAwaitManager.browse();
                                 AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next();
 
                                 // lets interrupt it
                                 String id = thread.getExchange().getExchangeId();
-                                context.getCamelContextExtension().getAsyncProcessorAwaitManager().interrupt(id);
+                                asyncProcessorAwaitManager.interrupt(id);
                             }
                         }).transform(constant("Hi Camel")).to("mock:result");
             }
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java
index 808e39cb562..e8dcf3bee9b 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java
@@ -26,6 +26,7 @@ import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.Registry;
+import org.apache.camel.support.PluginHelper;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
@@ -50,9 +51,10 @@ public class AsyncProcessorAwaitManagerInterruptWithRedeliveryTest extends Conte
 
     @Test
     public void testAsyncAwaitInterrupt() throws Exception {
-        context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true);
+        final AsyncProcessorAwaitManager asyncProcessorAwaitManager = PluginHelper.getAsyncProcessorAwaitManager(context);
+        asyncProcessorAwaitManager.getStatistics().setStatisticsEnabled(true);
 
-        assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size());
+        assertEquals(0, asyncProcessorAwaitManager.size());
 
         getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
         getMockEndpoint("mock:result").expectedMessageCount(0);
@@ -72,10 +74,10 @@ public class AsyncProcessorAwaitManagerInterruptWithRedeliveryTest extends Conte
         // Check we have not reached the full 5 re-deliveries
         verify(bean, atMost(4)).callMe();
 
-        assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size());
+        assertEquals(0, asyncProcessorAwaitManager.size());
         assertEquals(1,
-                context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked());
-        assertEquals(1, context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics()
+                asyncProcessorAwaitManager.getStatistics().getThreadsBlocked());
+        assertEquals(1, asyncProcessorAwaitManager.getStatistics()
                 .getThreadsInterrupted());
     }
 
@@ -89,16 +91,17 @@ public class AsyncProcessorAwaitManagerInterruptWithRedeliveryTest extends Conte
             }
 
             // Get our blocked thread
-            int size = context.getCamelContextExtension().getAsyncProcessorAwaitManager().size();
+            final AsyncProcessorAwaitManager asyncProcessorAwaitManager = PluginHelper.getAsyncProcessorAwaitManager(context);
+            int size = asyncProcessorAwaitManager.size();
             assertEquals(1, size);
 
             Collection<AsyncProcessorAwaitManager.AwaitThread> threads
-                    = context.getCamelContextExtension().getAsyncProcessorAwaitManager().browse();
+                    = asyncProcessorAwaitManager.browse();
             AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next();
 
             // Interrupt it
             String id = thread.getExchange().getExchangeId();
-            context.getCamelContextExtension().getAsyncProcessorAwaitManager().interrupt(id);
+            asyncProcessorAwaitManager.interrupt(id);
         }).start();
     }
 
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
index c70e060063c..659797fa756 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
@@ -23,6 +23,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.support.PluginHelper;
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -32,9 +33,10 @@ public class AsyncProcessorAwaitManagerTest extends ContextTestSupport {
 
     @Test
     public void testAsyncAwait() throws Exception {
-        context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true);
+        final AsyncProcessorAwaitManager asyncProcessorAwaitManager = PluginHelper.getAsyncProcessorAwaitManager(context);
+        asyncProcessorAwaitManager.getStatistics().setStatisticsEnabled(true);
 
-        assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size());
+        assertEquals(0, asyncProcessorAwaitManager.size());
 
         getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
         getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
@@ -45,10 +47,10 @@ public class AsyncProcessorAwaitManagerTest extends ContextTestSupport {
 
         assertMockEndpointsSatisfied();
 
-        assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size());
+        assertEquals(0, asyncProcessorAwaitManager.size());
         assertEquals(1,
-                context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked());
-        assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics()
+                asyncProcessorAwaitManager.getStatistics().getThreadsBlocked());
+        assertEquals(0, asyncProcessorAwaitManager.getStatistics()
                 .getThreadsInterrupted());
     }
 
@@ -63,12 +65,12 @@ public class AsyncProcessorAwaitManagerTest extends ContextTestSupport {
                         .process(new Processor() {
                             @Override
                             public void process(Exchange exchange) throws Exception {
-                                int size = context.getCamelContextExtension().getAsyncProcessorAwaitManager().size();
+                                int size = PluginHelper.getAsyncProcessorAwaitManager(context).size();
                                 log.info("async inflight: {}", size);
                                 assertEquals(1, size);
 
                                 Collection<AsyncProcessorAwaitManager.AwaitThread> threads
-                                        = context.getCamelContextExtension().getAsyncProcessorAwaitManager().browse();
+                                        = PluginHelper.getAsyncProcessorAwaitManager(context).browse();
                                 AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next();
 
                                 long wait = thread.getWaitDuration();
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
index cfd445143f5..d1cc0adc905 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
@@ -383,7 +383,7 @@ public final class DefaultConfigurationConfigurer {
         }
         AsyncProcessorAwaitManager apam = getSingleBeanOfType(registry, AsyncProcessorAwaitManager.class);
         if (apam != null) {
-            ecc.getCamelContextExtension().setAsyncProcessorAwaitManager(apam);
+            ecc.getCamelContextExtension().addContextPlugin(AsyncProcessorAwaitManager.class, apam);
         }
         ManagementStrategy ms = getSingleBeanOfType(registry, ManagementStrategy.class);
         if (ms != null) {
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java
index 6052d2f6dd4..19ae9d8f26c 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java
@@ -43,7 +43,7 @@ public final class AsyncProcessorHelper {
      */
     public static void process(final AsyncProcessor processor, final Exchange exchange) throws Exception {
         final AsyncProcessorAwaitManager awaitManager
-                = exchange.getContext().getCamelContextExtension().getAsyncProcessorAwaitManager();
+                = PluginHelper.getAsyncProcessorAwaitManager(exchange.getContext());
         awaitManager.process(processor, exchange);
     }
 
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java b/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java
index b693d3f1263..eba0c4ebd02 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java
@@ -36,7 +36,7 @@ public abstract class AsyncProcessorSupport extends ServiceSupport implements As
     @Override
     public void process(Exchange exchange) throws Exception {
         AsyncProcessorAwaitManager awaitManager
-                = exchange.getContext().getCamelContextExtension().getAsyncProcessorAwaitManager();
+                = PluginHelper.getAsyncProcessorAwaitManager(exchange.getContext());
         awaitManager.process(this, exchange);
     }
 
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
index 06df2020b51..24372acc978 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
@@ -36,7 +36,7 @@ public abstract class DefaultAsyncProducer extends DefaultProducer implements As
     @Override
     public void process(Exchange exchange) throws Exception {
         AsyncProcessorAwaitManager awaitManager
-                = exchange.getContext().getCamelContextExtension().getAsyncProcessorAwaitManager();
+                = PluginHelper.getAsyncProcessorAwaitManager(exchange.getContext());
         awaitManager.process(this, exchange);
     }
 
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
index deebc08f9d1..4870a9e22f5 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
@@ -23,6 +23,7 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.console.DevConsoleResolver;
 import org.apache.camel.health.HealthCheckResolver;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.CamelBeanPostProcessor;
 import org.apache.camel.spi.CamelDependencyInjectionAnnotationFactory;
 import org.apache.camel.spi.ComponentNameResolver;
@@ -430,11 +431,28 @@ public final class PluginHelper {
         return getRoutesLoader(camelContext.getCamelContextExtension());
     }
 
-
     /**
      * Gets the {@link RoutesLoader} to be used.
      */
     public static RoutesLoader getRoutesLoader(ExtendedCamelContext extendedCamelContext) {
         return extendedCamelContext.getContextPlugin(RoutesLoader.class);
     }
+
+    /**
+     * Gets the {@link org.apache.camel.AsyncProcessor} await manager.
+     *
+     * @return the manager
+     */
+    public static AsyncProcessorAwaitManager getAsyncProcessorAwaitManager(CamelContext camelContext) {
+        return getAsyncProcessorAwaitManager(camelContext.getCamelContextExtension());
+    }
+
+    /**
+     * Gets the {@link org.apache.camel.AsyncProcessor} await manager.
+     *
+     * @return the manager
+     */
+    public static AsyncProcessorAwaitManager getAsyncProcessorAwaitManager(ExtendedCamelContext extendedCamelContext) {
+        return extendedCamelContext.getContextPlugin(AsyncProcessorAwaitManager.class);
+    }
 }