You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/24 13:57:34 UTC

[camel] 02/02: CAMEL-14435: Optimize core

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit e1c93a38abf6ad225f5b64c0bc768e1e134f8543
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Jan 24 14:57:06 2020 +0100

    CAMEL-14435: Optimize core
---
 .../workitem/InOnlyCamelWorkItemHandlerTest.java   | 101 ------
 .../workitem/InOutCamelWorkItemHandlerTest.java    | 382 ---------------------
 .../camel/component/kafka/KafkaProducerTest.java   |   6 +-
 .../apache/camel/component/mock/MockEndpoint.java  |   7 +-
 .../openstack/AbstractProducerTestSupport.java     |   6 +-
 .../main/java/org/apache/camel/CamelContext.java   |  22 --
 .../org/apache/camel/ExtendedCamelContext.java     |  21 ++
 .../engine/DefaultAsyncProcessorAwaitManager.java  |   3 +-
 .../camel/processor/CamelInternalProcessor.java    |   5 +-
 .../org/apache/camel/processor/LoopProcessor.java  |  15 +-
 .../apache/camel/processor/MulticastProcessor.java |  16 +-
 .../java/org/apache/camel/processor/Pipeline.java  |   3 +-
 .../processor/SharedCamelInternalProcessor.java    |   2 +-
 .../org/apache/camel/processor/TryProcessor.java   |  13 +-
 .../processor/aggregate/AggregateProcessor.java    |   6 +-
 .../errorhandler/RedeliveryErrorHandler.java       |   2 +-
 .../loadbalancer/FailOverLoadBalancer.java         |   5 +-
 .../processor/loadbalancer/TopicLoadBalancer.java  |   5 +-
 .../java/org/apache/camel/reifier/LoopReifier.java |   2 +-
 .../java/org/apache/camel/reifier/TryReifier.java  |   2 +-
 .../core/xml/AbstractCamelContextFactoryBean.java  |   4 +-
 .../impl/CustomHeadersMapFactoryRouteTest.java     |   5 +-
 .../impl/HashMapHeadersMapFactoryRouteTest.java    |   3 +-
 .../headersmap/CamelFastHeadersMapTest.java        |   3 +-
 .../management/mbean/ManagedCamelContext.java      |   2 +-
 .../org/apache/camel/support/DefaultExchange.java  |   3 +-
 .../org/apache/camel/support/DefaultMessage.java   |   7 +-
 27 files changed, 101 insertions(+), 550 deletions(-)

diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOnlyCamelWorkItemHandlerTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOnlyCamelWorkItemHandlerTest.java
deleted file mode 100644
index 820b50d..0000000
--- a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOnlyCamelWorkItemHandlerTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.jbpm.workitem;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.component.jbpm.JBPMConstants;
-import org.apache.camel.impl.engine.DefaultHeadersMapFactory;
-import org.apache.camel.spi.HeadersMapFactory;
-import org.drools.core.process.instance.impl.WorkItemImpl;
-import org.jbpm.process.workitem.core.TestWorkItemManager;
-import org.jbpm.services.api.service.ServiceRegistry;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.kie.api.runtime.manager.RuntimeManager;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.*;
-import static org.mockito.ArgumentMatchers.*;
-import static org.mockito.Mockito.*;
-
-@RunWith(MockitoJUnitRunner.class)
-public class InOnlyCamelWorkItemHandlerTest {
-
-    @Mock
-    ProducerTemplate producerTemplate;
-
-    @Mock
-    Exchange outExchange;
-
-    @Mock
-    Message outMessage;
-
-    @Mock
-    CamelContext camelContext;
-
-    @Mock
-    RuntimeManager runtimeManager;
-
-    @Test
-    public void testExecuteInOnlyLocalCamelContext() throws Exception {
-
-        String camelEndpointId = "testCamelRoute";
-        String camelRouteUri = "direct:" + camelEndpointId;
-
-        String testReponse = "testResponse";
-
-        String runtimeManagerId = "testRuntimeManager";
-
-        when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
-        when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenReturn(outExchange);
-        when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
-        when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
-        HeadersMapFactory hmf = new DefaultHeadersMapFactory();
-        when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
-
-        // Register the RuntimeManager bound camelcontext.
-        try {
-            ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
-
-            WorkItemImpl workItem = new WorkItemImpl();
-            workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId);
-            workItem.setParameter("Request", "someRequest");
-            workItem.setDeploymentId("testDeploymentId");
-            workItem.setProcessInstanceId(1L);
-            workItem.setId(1L);
-
-            AbstractCamelWorkItemHandler handler = new InOnlyCamelWorkItemHandler(runtimeManager);
-
-            TestWorkItemManager manager = new TestWorkItemManager();
-            handler.executeWorkItem(workItem,
-                    manager);
-            assertThat(manager.getResults(), is(notNullValue()));
-            // InOnly does not complete WorkItem.
-            assertThat(manager.getResults().size(), equalTo(0));
-        } finally {
-            ServiceRegistry.get().remove(runtimeManagerId + "_CamelService");
-        }
-    }
-}
diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOutCamelWorkItemHandlerTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOutCamelWorkItemHandlerTest.java
deleted file mode 100644
index 32a967d..0000000
--- a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOutCamelWorkItemHandlerTest.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.jbpm.workitem;
-
-import java.util.Map;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.component.jbpm.JBPMConstants;
-import org.apache.camel.impl.engine.DefaultHeadersMapFactory;
-import org.apache.camel.spi.HeadersMapFactory;
-import org.drools.core.process.instance.impl.WorkItemImpl;
-import org.jbpm.bpmn2.handler.WorkItemHandlerRuntimeException;
-import org.jbpm.process.workitem.core.TestWorkItemManager;
-import org.jbpm.services.api.service.ServiceRegistry;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.kie.api.runtime.manager.RuntimeManager;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.*;
-import static org.mockito.ArgumentMatchers.*;
-import static org.mockito.Mockito.*;
-
-@RunWith(MockitoJUnitRunner.class)
-public class InOutCamelWorkItemHandlerTest {
-
-    @Mock
-    ProducerTemplate producerTemplate;
-
-    @Mock
-    Exchange outExchange;
-
-    @Mock
-    Message outMessage;
-
-    @Mock
-    CamelContext camelContext;
-
-    @Mock
-    RuntimeManager runtimeManager;
-
-    @Test
-    public void testExecuteInOutGlobalCamelContext() throws Exception {
-
-        String camelEndpointId = "testCamelRoute";
-        String camelRouteUri = "direct:" + camelEndpointId;
-
-        String testReponse = "testResponse";
-
-        when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenReturn(outExchange);
-        when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
-        when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
-        HeadersMapFactory hmf = new DefaultHeadersMapFactory();
-        when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
-
-        when(outExchange.getOut()).thenReturn(outMessage);
-        when(outMessage.getBody()).thenReturn(testReponse);
-
-        try {
-            ServiceRegistry.get().register("GlobalCamelService", camelContext);
-
-            TestWorkItemManager manager = new TestWorkItemManager();
-            WorkItemImpl workItem = new WorkItemImpl();
-            workItem.setParameter("CamelEndpointId", camelEndpointId);
-            workItem.setParameter("Request", "someRequest");
-            workItem.setDeploymentId("testDeploymentId");
-            workItem.setProcessInstanceId(1L);
-            workItem.setId(1L);
-
-            AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler();
-
-            handler.executeWorkItem(workItem, manager);
-            assertThat(manager.getResults(), is(notNullValue()));
-            assertThat(manager.getResults().size(), equalTo(1));
-            assertThat(manager.getResults().containsKey(workItem.getId()), is(true));
-            Map<String, Object> results = manager.getResults(workItem.getId());
-            assertThat(results.size(), equalTo(2));
-            assertThat(results.get("Response"), equalTo(testReponse));
-
-        } finally {
-            ServiceRegistry.get().remove("GlobalCamelService");
-        }
-
-    }
-
-    @Test
-    public void testExecuteInOutLocalCamelContext() throws Exception {
-
-        String camelEndpointId = "testCamelRoute";
-        String camelRouteUri = "direct:" + camelEndpointId;
-
-        String testReponse = "testResponse";
-
-        String runtimeManagerId = "testRuntimeManager";
-
-        when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
-        when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenReturn(outExchange);
-        when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
-        when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
-        HeadersMapFactory hmf = new DefaultHeadersMapFactory();
-        when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
-
-        when(outExchange.getOut()).thenReturn(outMessage);
-        when(outMessage.getBody()).thenReturn(testReponse);
-
-        // Register the RuntimeManager bound camelcontext.
-        try {
-            ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
-
-            WorkItemImpl workItem = new WorkItemImpl();
-            workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId);
-            workItem.setParameter("Request", "someRequest");
-            workItem.setDeploymentId("testDeploymentId");
-            workItem.setProcessInstanceId(1L);
-            workItem.setId(1L);
-
-            AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager);
-
-            TestWorkItemManager manager = new TestWorkItemManager();
-            handler.executeWorkItem(workItem, manager);
-            assertThat(manager.getResults(), is(notNullValue()));
-            assertThat(manager.getResults().size(), equalTo(1));
-            assertThat(manager.getResults().containsKey(workItem.getId()), is(true));
-
-            Map<String, Object> results = manager.getResults(workItem.getId());
-            assertThat(results.size(), equalTo(2));
-            assertThat(results.get(JBPMConstants.RESPONSE_WI_PARAM), equalTo(testReponse));
-        } finally {
-            ServiceRegistry.get().remove(runtimeManagerId + "_CamelService");
-        }
-    }
-
-    @Test
-    public void testExecuteInOutLocalCamelContextLazyInit() throws Exception {
-
-        String camelEndpointId = "testCamelRoute";
-        String camelRouteUri = "direct:" + camelEndpointId;
-
-        String testReponse = "testResponse";
-
-        String runtimeManagerId = "testRuntimeManager";
-
-        when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
-        when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenReturn(outExchange);
-        when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
-        when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
-        HeadersMapFactory hmf = new DefaultHeadersMapFactory();
-        when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
-
-        when(outExchange.getOut()).thenReturn(outMessage);
-        when(outMessage.getBody()).thenReturn(testReponse);
-
-        WorkItemImpl workItem = new WorkItemImpl();
-        workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId);
-        workItem.setParameter("Request", "someRequest");
-        workItem.setDeploymentId("testDeploymentId");
-        workItem.setProcessInstanceId(1L);
-        workItem.setId(1L);
-
-        AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager);
-
-        // Register the context after we've created the WIH to test lazy-init.
-        try {
-            ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
-
-            TestWorkItemManager manager = new TestWorkItemManager();
-            handler.executeWorkItem(workItem, manager);
-            assertThat(manager.getResults(), is(notNullValue()));
-            assertThat(manager.getResults().size(), equalTo(1));
-            assertThat(manager.getResults().containsKey(workItem.getId()), is(true));
-
-            Map<String, Object> results = manager.getResults(workItem.getId());
-            assertThat(results.size(), equalTo(2));
-            assertThat(results.get(JBPMConstants.RESPONSE_WI_PARAM), equalTo(testReponse));
-        } finally {
-            ServiceRegistry.get().remove(runtimeManagerId + "_CamelService");
-        }
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testExecuteInOutLocalCamelContextLazyInitFail() throws Exception {
-
-        String camelEndpointId = "testCamelRoute";
-        String camelRouteUri = "direct:" + camelEndpointId;
-
-        String testReponse = "testResponse";
-
-        String runtimeManagerId = "testRuntimeManager";
-
-        when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
-        WorkItemImpl workItem = new WorkItemImpl();
-        workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId);
-        workItem.setParameter("Request", "someRequest");
-        workItem.setDeploymentId("testDeploymentId");
-        workItem.setProcessInstanceId(1L);
-        workItem.setId(1L);
-
-        AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager);
-
-        TestWorkItemManager manager = new TestWorkItemManager();
-        // This is expected to throw an exception.
-        handler.executeWorkItem(workItem, manager);
-
-    }
-    
-    @Test
-    public void testExecuteInOutLocalCamelContextDefaultHandleException() throws Exception {
-
-        String camelEndpointId = "testCamelRoute";
-        String camelRouteUri = "direct:" + camelEndpointId;
-
-        String testReponse = "testResponse";
-
-        String runtimeManagerId = "testRuntimeManager";
-        
-
-        when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
-        //Throw an error back to the WIH
-        when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenThrow(new ToBeHandledException());
-        when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
-        when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
-        HeadersMapFactory hmf = new DefaultHeadersMapFactory();
-        when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
-
-        // Register the RuntimeManager bound camelcontext.
-        try {
-            ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
-
-            WorkItemImpl workItem = new WorkItemImpl();
-            workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId);
-            workItem.setParameter("Request", "someRequest");
-            workItem.setDeploymentId("testDeploymentId");
-            workItem.setProcessInstanceId(1L);
-            workItem.setId(1L);
-
-            AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager);
-
-            TestWorkItemManager manager = new TestWorkItemManager();
-            try {
-                handler.executeWorkItem(workItem, manager);
-                throw new RuntimeException("The test expects an exception. This code should never be reached.");
-            } catch (Throwable wihRe) {
-                assertThat(wihRe, is(instanceOf(WorkItemHandlerRuntimeException.class)));
-                assertThat(wihRe.getCause(), is(instanceOf(ToBeHandledException.class)));
-            }
-
-        } finally {
-            ServiceRegistry.get().remove(runtimeManagerId + "_CamelService");
-        }
-    }
-
-    @Test
-    public void testExecuteInOutLocalCamelContextExplicitHandleException() throws Exception {
-
-        String camelEndpointId = "testCamelRoute";
-        String camelRouteUri = "direct:" + camelEndpointId;
-
-        String testReponse = "testResponse";
-
-        String runtimeManagerId = "testRuntimeManager";
-        
-
-        when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
-        //Throw an error back to the WIH
-        when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenThrow(new ToBeHandledException());
-        when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
-        when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
-        HeadersMapFactory hmf = new DefaultHeadersMapFactory();
-        when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
-
-        // Register the RuntimeManager bound camelcontext.
-        try {
-            ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
-
-            WorkItemImpl workItem = new WorkItemImpl();
-            workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId);
-            workItem.setParameter("Request", "someRequest");
-            workItem.setParameter("HandleExceptions", true);
-            workItem.setDeploymentId("testDeploymentId");
-            workItem.setProcessInstanceId(1L);
-            workItem.setId(1L);
-
-            AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager);
-
-            TestWorkItemManager manager = new TestWorkItemManager();
-            try {
-                handler.executeWorkItem(workItem, manager);
-                throw new RuntimeException("The test expects an exception. This code should never be reached.");
-            } catch (Throwable wihRe) {
-                assertThat(wihRe, is(instanceOf(WorkItemHandlerRuntimeException.class)));
-                assertThat(wihRe.getCause(), is(instanceOf(ToBeHandledException.class)));
-            }
-
-        } finally {
-            ServiceRegistry.get().remove(runtimeManagerId + "_CamelService");
-        }
-    }
-    
-    @Test
-    public void testExecuteInOutLocalCamelContextNotHandleException() throws Exception {
-
-        String camelEndpointId = "testCamelRoute";
-        String camelRouteUri = "direct:" + camelEndpointId;
-
-        String testReponse = "testResponse";
-
-        String runtimeManagerId = "testRuntimeManager";
-
-        when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
-        //Throw an error back to the WIH
-        when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenThrow(new NotToBeHandledException());
-        when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
-        when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
-        HeadersMapFactory hmf = new DefaultHeadersMapFactory();
-        when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
-
-        // Register the RuntimeManager bound camelcontext.
-        try {
-            ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
-
-            WorkItemImpl workItem = new WorkItemImpl();
-            workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId);
-            workItem.setParameter("Request", "someRequest");
-            workItem.setParameter("HandleExceptions", false);
-            workItem.setDeploymentId("testDeploymentId");
-            workItem.setProcessInstanceId(1L);
-            workItem.setId(1L);
-
-            AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager);
-
-            TestWorkItemManager manager = new TestWorkItemManager();
-            try {
-                handler.executeWorkItem(workItem, manager);
-                throw new RuntimeException("The test expects an exception. This code should never be reached.");
-            } catch (Throwable wihRe) {
-                assertThat(wihRe, is(instanceOf(NotToBeHandledException.class)));
-            }
-
-        } finally {
-            ServiceRegistry.get().remove(runtimeManagerId + "_CamelService");
-        }
-    }
-    
-    public static class ToBeHandledException extends RuntimeException {
-    }
-    
-    public static class NotToBeHandledException extends RuntimeException {
-    }
-
-}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 4e94753..a982e72 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -29,6 +29,7 @@ import org.apache.camel.AggregationStrategy;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Message;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.impl.DefaultCamelContext;
@@ -63,7 +64,7 @@ public class KafkaProducerTest {
     private TypeConverter converter = Mockito.mock(TypeConverter.class);
     private CamelContext context = Mockito.mock(CamelContext.class);
     private Exchange exchange = Mockito.mock(Exchange.class);
-    private CamelContext camelContext = Mockito.mock(CamelContext.class);
+    private ExtendedCamelContext camelContext = Mockito.mock(ExtendedCamelContext.class);
     private Message in = new DefaultMessage(camelContext);
     private Message out = new DefaultMessage(camelContext);
     private AsyncCallback callback = Mockito.mock(AsyncCallback.class);
@@ -87,7 +88,8 @@ public class KafkaProducerTest {
         Mockito.when(exchange.getContext()).thenReturn(context);
         Mockito.when(context.getTypeConverter()).thenReturn(converter);
         Mockito.when(converter.tryConvertTo(String.class, exchange, null)).thenReturn(null);
-        Mockito.when(camelContext.getHeadersMapFactory()).thenReturn(new DefaultHeadersMapFactory());
+        Mockito.when(camelContext.adapt(ExtendedCamelContext.class)).thenReturn(camelContext);
+        Mockito.when(camelContext.adapt(ExtendedCamelContext.class).getHeadersMapFactory()).thenReturn(new DefaultHeadersMapFactory());
         Mockito.when(camelContext.getTypeConverter()).thenReturn(converter);
 
         producer.setKafkaProducer(kp);
diff --git a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
index 4d68bc0..5557825 100644
--- a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
+++ b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
@@ -38,6 +38,7 @@ import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Handler;
 import org.apache.camel.Message;
 import org.apache.camel.Predicate;
@@ -548,7 +549,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
             expectedMinimumMessageCount(1);
         }
         if (expectedHeaderValues == null) {
-            expectedHeaderValues = getCamelContext().getHeadersMapFactory().newMap();
+            expectedHeaderValues = getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap();
             // we just wants to expects to be called once
             expects(new AssertionTask() {
                 @Override
@@ -1565,7 +1566,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
 
         if (expectedHeaderValues != null) {
             if (actualHeaderValues == null) {
-                actualHeaderValues = getCamelContext().getHeadersMapFactory().newMap();
+                actualHeaderValues = getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap();
             }
             if (in.hasHeaders()) {
                 actualHeaderValues.putAll(in.getHeaders());
@@ -1574,7 +1575,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
 
         if (expectedPropertyValues != null) {
             if (actualPropertyValues == null) {
-                actualPropertyValues = getCamelContext().getHeadersMapFactory().newMap();
+                actualPropertyValues = getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap();
             }
             actualPropertyValues.putAll(copy.getProperties());
         }
diff --git a/components/camel-openstack/src/test/java/org/apache/camel/component/openstack/AbstractProducerTestSupport.java b/components/camel-openstack/src/test/java/org/apache/camel/component/openstack/AbstractProducerTestSupport.java
index 86aeddf..8406e84 100644
--- a/components/camel-openstack/src/test/java/org/apache/camel/component/openstack/AbstractProducerTestSupport.java
+++ b/components/camel-openstack/src/test/java/org/apache/camel/component/openstack/AbstractProducerTestSupport.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Message;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.engine.DefaultHeadersMapFactory;
@@ -42,7 +43,7 @@ public abstract class AbstractProducerTestSupport {
     protected Exchange exchange;
 
     @Mock
-    protected CamelContext camelContext;
+    protected ExtendedCamelContext camelContext;
 
     protected Message msg;
 
@@ -52,6 +53,7 @@ public abstract class AbstractProducerTestSupport {
     public void before() throws IOException {
         msg = new DefaultMessage(camelContext);
         when(exchange.getIn()).thenReturn(msg);
-        when(camelContext.getHeadersMapFactory()).thenReturn(new DefaultHeadersMapFactory());
+        when(camelContext.adapt(ExtendedCamelContext.class)).thenReturn(camelContext);
+        when(camelContext.adapt(ExtendedCamelContext.class).getHeadersMapFactory()).thenReturn(new DefaultHeadersMapFactory());
     }
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
index 6b0396f..eb9dd95 100644
--- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
@@ -25,7 +25,6 @@ import org.apache.camel.spi.DataType;
 import org.apache.camel.spi.Debugger;
 import org.apache.camel.spi.EndpointRegistry;
 import org.apache.camel.spi.ExecutorServiceManager;
-import org.apache.camel.spi.HeadersMapFactory;
 import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.Injector;
 import org.apache.camel.spi.Language;
@@ -34,7 +33,6 @@ import org.apache.camel.spi.ManagementNameStrategy;
 import org.apache.camel.spi.ManagementStrategy;
 import org.apache.camel.spi.MessageHistoryFactory;
 import org.apache.camel.spi.PropertiesComponent;
-import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.Registry;
 import org.apache.camel.spi.RestConfiguration;
 import org.apache.camel.spi.RestRegistry;
@@ -1272,24 +1270,4 @@ public interface CamelContext extends StatefulService, RuntimeConfiguration {
      */
     SSLContextParameters getSSLContextParameters();
 
-    /**
-     * Gets the {@link HeadersMapFactory} to use.
-     */
-    HeadersMapFactory getHeadersMapFactory();
-
-    /**
-     * Sets a custom {@link HeadersMapFactory} to be used.
-     */
-    void setHeadersMapFactory(HeadersMapFactory factory);
-
-    /**
-     * Gets the {@link ReactiveExecutor} to use.
-     */
-    ReactiveExecutor getReactiveExecutor();
-
-    /**
-     * Sets a custom {@link ReactiveExecutor} to be used.
-     */
-    void setReactiveExecutor(ReactiveExecutor reactiveExecutor);
-
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index 7e38f0f..22efe7d 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -34,6 +34,7 @@ import org.apache.camel.spi.DeferServiceFactory;
 import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.FactoryFinder;
 import org.apache.camel.spi.FactoryFinderResolver;
+import org.apache.camel.spi.HeadersMapFactory;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.LogListener;
@@ -43,6 +44,7 @@ import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.spi.PackageScanClassResolver;
 import org.apache.camel.spi.PackageScanResourceResolver;
 import org.apache.camel.spi.ProcessorFactory;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.Registry;
 import org.apache.camel.spi.RouteStartupOrder;
 import org.apache.camel.spi.UnitOfWorkFactory;
@@ -359,4 +361,23 @@ public interface ExtendedCamelContext extends CamelContext {
      */
     void setBeanIntrospection(BeanIntrospection beanIntrospection);
 
+    /**
+     * Gets the {@link HeadersMapFactory} to use.
+     */
+    HeadersMapFactory getHeadersMapFactory();
+
+    /**
+     * Sets a custom {@link HeadersMapFactory} to be used.
+     */
+    void setHeadersMapFactory(HeadersMapFactory factory);
+
+    /**
+     * Gets the {@link ReactiveExecutor} to use.
+     */
+    ReactiveExecutor getReactiveExecutor();
+
+    /**
+     * Sets a custom {@link ReactiveExecutor} to be used.
+     */
+    void setReactiveExecutor(ReactiveExecutor reactiveExecutor);
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
index 7631456..fb67842 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StaticService;
@@ -87,7 +88,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
     }
 
     public void await(Exchange exchange, CountDownLatch latch) {
-        ReactiveExecutor reactiveExecutor = exchange.getContext().getReactiveExecutor();
+        ReactiveExecutor reactiveExecutor = exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor();
         // Early exit for pending reactive queued work
         do {
             if (latch.getCount() <= 0) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 527a972..783fd2e 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -55,7 +55,6 @@ import org.apache.camel.spi.Transformer;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.spi.UnitOfWorkFactory;
 import org.apache.camel.support.CamelContextHelper;
-import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.MessageHelper;
 import org.apache.camel.support.OrderedComparator;
 import org.apache.camel.support.SynchronizationAdapter;
@@ -106,14 +105,14 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
 
     public CamelInternalProcessor(CamelContext camelContext) {
         this.camelContext = camelContext;
-        this.reactiveExecutor = camelContext.getReactiveExecutor();
+        this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         this.shutdownStrategy = camelContext.getShutdownStrategy();
     }
 
     public CamelInternalProcessor(CamelContext camelContext, Processor processor) {
         super(processor);
         this.camelContext = camelContext;
-        this.reactiveExecutor = camelContext.getReactiveExecutor();
+        this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         this.shutdownStrategy = camelContext.getShutdownStrategy();
     }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
index aae38ee..a70d550 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -17,13 +17,16 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.support.processor.DelegateAsyncProcessor;
@@ -42,12 +45,16 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
 
     private String id;
     private String routeId;
+    private final CamelContext camelContext;
+    private final ReactiveExecutor reactiveExecutor;
     private final Expression expression;
     private final Predicate predicate;
     private final boolean copy;
 
-    public LoopProcessor(Processor processor, Expression expression, Predicate predicate, boolean copy) {
+    public LoopProcessor(CamelContext camelContext, Processor processor, Expression expression, Predicate predicate, boolean copy) {
         super(processor);
+        this.camelContext = camelContext;
+        this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         this.expression = expression;
         this.predicate = predicate;
         this.copy = copy;
@@ -59,9 +66,9 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
             LoopState state = new LoopState(exchange, callback);
 
             if (exchange.isTransacted()) {
-                exchange.getContext().getReactiveExecutor().scheduleSync(state);
+                reactiveExecutor.scheduleSync(state);
             } else {
-                exchange.getContext().getReactiveExecutor().scheduleMain(state);
+                reactiveExecutor.scheduleMain(state);
             }
             return false;
         } catch (Exception e) {
@@ -117,7 +124,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
                     processor.process(current, doneSync -> {
                         // increment counter after done
                         index++;
-                        exchange.getContext().getReactiveExecutor().schedule(this);
+                        reactiveExecutor.schedule(this);
                     });
                 } else {
                     // we are done so prepare the result
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 7a4c0bd..82bafea 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -43,6 +43,7 @@ import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.ErrorHandlerFactory;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -50,6 +51,7 @@ import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StreamCache;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.UnitOfWork;
@@ -145,6 +147,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
 
     protected final Processor onPrepare;
     private final CamelContext camelContext;
+    private final ReactiveExecutor reactiveExecutor;
     private String id;
     private String routeId;
     private Collection<Processor> processors;
@@ -182,6 +185,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
                               boolean parallelAggregate, boolean stopOnAggregateException) {
         notNull(camelContext, "camelContext");
         this.camelContext = camelContext;
+        this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         this.processors = processors;
         this.aggregationStrategy = aggregationStrategy;
         this.executorService = executorService;
@@ -246,12 +250,12 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
 
         MulticastState state = new MulticastState(exchange, pairs, callback);
         if (isParallelProcessing()) {
-            executorService.submit(() -> exchange.getContext().getReactiveExecutor().schedule(state));
+            executorService.submit(() -> reactiveExecutor.schedule(state));
         } else {
             if (exchange.isTransacted()) {
-                exchange.getContext().getReactiveExecutor().scheduleSync(state);
+                reactiveExecutor.scheduleSync(state);
             } else {
-                exchange.getContext().getReactiveExecutor().scheduleMain(state);
+                reactiveExecutor.scheduleMain(state);
             }
         }
 
@@ -263,9 +267,9 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
 
     protected void schedule(Runnable runnable) {
         if (isParallelProcessing()) {
-            executorService.submit(() -> camelContext.getReactiveExecutor().schedule(runnable));
+            executorService.submit(() -> reactiveExecutor.schedule(runnable));
         } else {
-            camelContext.getReactiveExecutor().schedule(runnable);
+            reactiveExecutor.schedule(runnable);
         }
     }
 
@@ -552,7 +556,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
             original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust);
         }
 
-        camelContext.getReactiveExecutor().schedule(callback);
+        reactiveExecutor.schedule(callback);
     }
 
     /**
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index 3cfba1b..350c5e5 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -26,6 +26,7 @@ import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
@@ -58,7 +59,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
 
     public Pipeline(CamelContext camelContext, Collection<Processor> processors) {
         this.camelContext = camelContext;
-        this.reactiveExecutor = camelContext.getReactiveExecutor();
+        this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         this.processors = processors.stream().map(AsyncProcessorConverterHelper::convert).collect(Collectors.toList());
     }
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index 37d457c..5e23568 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -81,7 +81,7 @@ public class SharedCamelInternalProcessor {
 
     public SharedCamelInternalProcessor(CamelContext camelContext, CamelInternalProcessorAdvice... advices) {
         this.camelContext = camelContext;
-        this.reactiveExecutor = camelContext.getReactiveExecutor();
+        this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         this.awaitManager = camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
         this.shutdownStrategy = camelContext.getShutdownStrategy();
 
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
index b29a275..cd08f60 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -22,11 +22,14 @@ import java.util.List;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
 import org.apache.camel.support.AsyncProcessorSupport;
@@ -42,13 +45,17 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
 
     private static final Logger LOG = LoggerFactory.getLogger(TryProcessor.class);
 
+    protected final CamelContext camelContext;
+    protected final ReactiveExecutor reactiveExecutor;
     protected String id;
     protected String routeId;
     protected final Processor tryProcessor;
     protected final List<Processor> catchClauses;
     protected final Processor finallyProcessor;
 
-    public TryProcessor(Processor tryProcessor, List<Processor> catchClauses, Processor finallyProcessor) {
+    public TryProcessor(CamelContext camelContext, Processor tryProcessor, List<Processor> catchClauses, Processor finallyProcessor) {
+        this.camelContext = camelContext;
+        this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         this.tryProcessor = tryProcessor;
         this.catchClauses = catchClauses;
         this.finallyProcessor = finallyProcessor;
@@ -66,7 +73,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        exchange.getContext().getReactiveExecutor().schedule(new TryState(exchange, callback));
+        reactiveExecutor.schedule(new TryState(exchange, callback));
         return false;
     }
 
@@ -95,7 +102,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
                 Processor processor = processors.next();
                 AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
                 LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
-                async.process(exchange, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this));
+                async.process(exchange, doneSync -> reactiveExecutor.schedule(this));
             } else {
                 ExchangeHelper.prepareOutToIn(exchange);
                 exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 9eac797..a8bde2d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -42,6 +42,7 @@ import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.NoSuchEndpointException;
@@ -55,6 +56,7 @@ import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.OptimisticLockingAggregationRepository;
+import org.apache.camel.spi.ReactiveExecutor;
 import org.apache.camel.spi.RecoverableAggregationRepository;
 import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.ShutdownAware;
@@ -106,6 +108,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
     private volatile Lock lock;
     private final AtomicBoolean aggregateRepositoryWarned = new AtomicBoolean();
     private final CamelContext camelContext;
+    private final ReactiveExecutor reactiveExecutor;
     private final AsyncProcessor processor;
     private String id;
     private String routeId;
@@ -259,6 +262,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
         ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy");
         ObjectHelper.notNull(executorService, "executorService");
         this.camelContext = camelContext;
+        this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         this.processor = processor;
         this.correlationExpression = correlationExpression;
         this.aggregationStrategy = aggregationStrategy;
@@ -857,7 +861,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
 
         // send this exchange
         // the call to schedule last if needed to ensure in-order processing of the aggregates
-        executorService.submit(() -> camelContext.getReactiveExecutor().scheduleSync(() -> processor.process(exchange, done -> {
+        executorService.submit(() -> reactiveExecutor.scheduleSync(() -> processor.process(exchange, done -> {
             // log exception if there was a problem
             if (exchange.getException() != null) {
                 // if there was an exception then let the exception handler handle it
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 32d61bc..e9bb3bb 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -102,7 +102,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
         ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this);
 
         this.camelContext = camelContext;
-        this.reactiveExecutor = camelContext.getReactiveExecutor();
+        this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
         this.awaitManager = camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
         this.shutdownStrategy = camelContext.getShutdownStrategy();
         this.redeliveryProcessor = redeliveryProcessor;
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
index 3f1a15e..286ea75 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
@@ -25,6 +25,7 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Traceable;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.ExchangeHelper;
@@ -164,7 +165,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         AsyncProcessor[] processors = doGetProcessors();
-        exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
+        exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
         return false;
     }
 
@@ -251,7 +252,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab
 
             // process the exchange
             LOG.debug("Processing failover at attempt {} for {}", attempts, copy);
-            processor.process(copy, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this::run));
+            processor.process(copy, doneSync -> exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor().schedule(this::run));
         }
 
     }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
index e8171b7..433a9fd 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor.loadbalancer;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.Processor;
 
 /**
@@ -33,7 +34,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport {
     @Override
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         AsyncProcessor[] processors = doGetProcessors();
-        exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
+        exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
         return false;
     }
 
@@ -64,7 +65,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport {
                 exchange.setException(current.getException());
                 callback.done(false);
             } else {
-                exchange.getContext().getReactiveExecutor().schedule(this::run);
+                exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor().schedule(this::run);
             }
         }
     }
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoopReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoopReifier.java
index 434abc0..464599a 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoopReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoopReifier.java
@@ -43,7 +43,7 @@ public class LoopReifier extends ExpressionReifier<LoopDefinition> {
         } else {
             expression = definition.getExpression().createExpression(routeContext);
         }
-        return new LoopProcessor(output, expression, predicate, isCopy);
+        return new LoopProcessor(routeContext.getCamelContext(), output, expression, predicate, isCopy);
     }
 
 }
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TryReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TryReifier.java
index 4809de5..1a88847 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TryReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TryReifier.java
@@ -59,7 +59,7 @@ public class TryReifier extends ProcessorReifier<TryDefinition> {
             throw new IllegalArgumentException("doTry must have one or more catch or finally blocks on " + this);
         }
 
-        return new TryProcessor(tryProcessor, catchProcessors, finallyProcessor);
+        return new TryProcessor(routeContext.getCamelContext(), tryProcessor, catchProcessors, finallyProcessor);
     }
 
 }
diff --git a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
index 4cc4729..e529703 100644
--- a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
+++ b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
@@ -260,7 +260,7 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
         HeadersMapFactory headersMapFactory = getBeanForType(HeadersMapFactory.class);
         if (headersMapFactory != null) {
             LOG.info("Using custom HeadersMapFactory: {}", headersMapFactory);
-            getContext().setHeadersMapFactory(headersMapFactory);
+            getContext().adapt(ExtendedCamelContext.class).setHeadersMapFactory(headersMapFactory);
         }
         JSonSchemaResolver jsonSchemaResolver = getBeanForType(JSonSchemaResolver.class);
         if (jsonSchemaResolver != null) {
@@ -1231,7 +1231,7 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
         ReactiveExecutor reactiveExecutor = getBeanForType(ReactiveExecutor.class);
         if (reactiveExecutor != null) {
             // already logged in CamelContext
-            getContext().setReactiveExecutor(reactiveExecutor);
+            getContext().adapt(ExtendedCamelContext.class).setReactiveExecutor(reactiveExecutor);
         }
     }
 }
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/CustomHeadersMapFactoryRouteTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/CustomHeadersMapFactoryRouteTest.java
index 1d78021..be908ea 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/CustomHeadersMapFactoryRouteTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/CustomHeadersMapFactoryRouteTest.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.spi.HeadersMapFactory;
 import org.junit.Test;
@@ -32,7 +33,7 @@ public class CustomHeadersMapFactoryRouteTest extends ContextTestSupport {
     @Override
     protected CamelContext createCamelContext() throws Exception {
         CamelContext context = super.createCamelContext();
-        context.setHeadersMapFactory(custom);
+        context.adapt(ExtendedCamelContext.class).setHeadersMapFactory(custom);
         return context;
     }
 
@@ -51,7 +52,7 @@ public class CustomHeadersMapFactoryRouteTest extends ContextTestSupport {
 
         assertMockEndpointsSatisfied();
 
-        assertSame(custom, context.getHeadersMapFactory());
+        assertSame(custom, context.adapt(ExtendedCamelContext.class).getHeadersMapFactory());
     }
 
     @Override
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/HashMapHeadersMapFactoryRouteTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/HashMapHeadersMapFactoryRouteTest.java
index d906a80..052c7f4 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/HashMapHeadersMapFactoryRouteTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/HashMapHeadersMapFactoryRouteTest.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.engine.HashMapHeadersMapFactory;
 import org.junit.Test;
@@ -30,7 +31,7 @@ public class HashMapHeadersMapFactoryRouteTest extends ContextTestSupport {
     @Override
     protected CamelContext createCamelContext() throws Exception {
         CamelContext context = super.createCamelContext();
-        context.setHeadersMapFactory(new HashMapHeadersMapFactory());
+        context.adapt(ExtendedCamelContext.class).setHeadersMapFactory(new HashMapHeadersMapFactory());
         return context;
     }
 
diff --git a/core/camel-headersmap/src/test/java/org/apache/camel/component/headersmap/CamelFastHeadersMapTest.java b/core/camel-headersmap/src/test/java/org/apache/camel/component/headersmap/CamelFastHeadersMapTest.java
index 9df52ae..616671b 100644
--- a/core/camel-headersmap/src/test/java/org/apache/camel/component/headersmap/CamelFastHeadersMapTest.java
+++ b/core/camel-headersmap/src/test/java/org/apache/camel/component/headersmap/CamelFastHeadersMapTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.headersmap;
 
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.spi.HeadersMapFactory;
@@ -34,7 +35,7 @@ public class CamelFastHeadersMapTest extends CamelTestSupport {
         assertMockEndpointsSatisfied();
 
         // should have detected custom and use that
-        HeadersMapFactory factory = context.getHeadersMapFactory();
+        HeadersMapFactory factory = context.adapt(ExtendedCamelContext.class).getHeadersMapFactory();
         assertIsInstanceOf(FastHeadersMapFactory.class, factory);
     }
 
diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
index f9cb86d..2b7841a 100644
--- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
+++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
@@ -143,7 +143,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti
 
     @Override
     public String getHeadersMapFactoryClassName() {
-        return context.getHeadersMapFactory().getClass().getName();
+        return context.adapt(ExtendedCamelContext.class).getHeadersMapFactory().getClass().getName();
     }
 
     @Override
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index 4e86dd6..ac78c4d 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -29,6 +29,7 @@ import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.MessageHistory;
@@ -132,7 +133,7 @@ public final class DefaultExchange implements ExtendedExchange {
             return null;
         }
 
-        return context.getHeadersMapFactory().newMap(headers);
+        return context.adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap(headers);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java
index 9a4da71..392191c 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java
@@ -23,6 +23,7 @@ import java.util.function.Supplier;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.spi.HeadersMapFactory;
 import org.apache.camel.util.ObjectHelper;
 
@@ -212,11 +213,11 @@ public class DefaultMessage extends MessageSupport {
     public void setHeaders(Map<String, Object> headers) {
         ObjectHelper.notNull(getCamelContext(), "CamelContext", this);
 
-        if (getCamelContext().getHeadersMapFactory().isInstanceOf(headers)) {
+        if (getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().isInstanceOf(headers)) {
             this.headers = headers;
         } else {
             // create a new map
-            this.headers = getCamelContext().getHeadersMapFactory().newMap(headers);
+            this.headers = getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap(headers);
         }
     }
 
@@ -247,7 +248,7 @@ public class DefaultMessage extends MessageSupport {
     protected Map<String, Object> createHeaders() {
         ObjectHelper.notNull(getCamelContext(), "CamelContext", this);
 
-        Map<String, Object> map = getCamelContext().getHeadersMapFactory().newMap();
+        Map<String, Object> map = getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap();
         populateInitialHeaders(map);
         return map;
     }