You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/01/24 13:57:34 UTC
[camel] 02/02: CAMEL-14435: Optimize core
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit e1c93a38abf6ad225f5b64c0bc768e1e134f8543
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Fri Jan 24 14:57:06 2020 +0100
CAMEL-14435: Optimize core
---
.../workitem/InOnlyCamelWorkItemHandlerTest.java | 101 ------
.../workitem/InOutCamelWorkItemHandlerTest.java | 382 ---------------------
.../camel/component/kafka/KafkaProducerTest.java | 6 +-
.../apache/camel/component/mock/MockEndpoint.java | 7 +-
.../openstack/AbstractProducerTestSupport.java | 6 +-
.../main/java/org/apache/camel/CamelContext.java | 22 --
.../org/apache/camel/ExtendedCamelContext.java | 21 ++
.../engine/DefaultAsyncProcessorAwaitManager.java | 3 +-
.../camel/processor/CamelInternalProcessor.java | 5 +-
.../org/apache/camel/processor/LoopProcessor.java | 15 +-
.../apache/camel/processor/MulticastProcessor.java | 16 +-
.../java/org/apache/camel/processor/Pipeline.java | 3 +-
.../processor/SharedCamelInternalProcessor.java | 2 +-
.../org/apache/camel/processor/TryProcessor.java | 13 +-
.../processor/aggregate/AggregateProcessor.java | 6 +-
.../errorhandler/RedeliveryErrorHandler.java | 2 +-
.../loadbalancer/FailOverLoadBalancer.java | 5 +-
.../processor/loadbalancer/TopicLoadBalancer.java | 5 +-
.../java/org/apache/camel/reifier/LoopReifier.java | 2 +-
.../java/org/apache/camel/reifier/TryReifier.java | 2 +-
.../core/xml/AbstractCamelContextFactoryBean.java | 4 +-
.../impl/CustomHeadersMapFactoryRouteTest.java | 5 +-
.../impl/HashMapHeadersMapFactoryRouteTest.java | 3 +-
.../headersmap/CamelFastHeadersMapTest.java | 3 +-
.../management/mbean/ManagedCamelContext.java | 2 +-
.../org/apache/camel/support/DefaultExchange.java | 3 +-
.../org/apache/camel/support/DefaultMessage.java | 7 +-
27 files changed, 101 insertions(+), 550 deletions(-)
diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOnlyCamelWorkItemHandlerTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOnlyCamelWorkItemHandlerTest.java
deleted file mode 100644
index 820b50d..0000000
--- a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOnlyCamelWorkItemHandlerTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.jbpm.workitem;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.component.jbpm.JBPMConstants;
-import org.apache.camel.impl.engine.DefaultHeadersMapFactory;
-import org.apache.camel.spi.HeadersMapFactory;
-import org.drools.core.process.instance.impl.WorkItemImpl;
-import org.jbpm.process.workitem.core.TestWorkItemManager;
-import org.jbpm.services.api.service.ServiceRegistry;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.kie.api.runtime.manager.RuntimeManager;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.*;
-import static org.mockito.ArgumentMatchers.*;
-import static org.mockito.Mockito.*;
-
-@RunWith(MockitoJUnitRunner.class)
-public class InOnlyCamelWorkItemHandlerTest {
-
- @Mock
- ProducerTemplate producerTemplate;
-
- @Mock
- Exchange outExchange;
-
- @Mock
- Message outMessage;
-
- @Mock
- CamelContext camelContext;
-
- @Mock
- RuntimeManager runtimeManager;
-
- @Test
- public void testExecuteInOnlyLocalCamelContext() throws Exception {
-
- String camelEndpointId = "testCamelRoute";
- String camelRouteUri = "direct:" + camelEndpointId;
-
- String testReponse = "testResponse";
-
- String runtimeManagerId = "testRuntimeManager";
-
- when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
- when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenReturn(outExchange);
- when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
- when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
- HeadersMapFactory hmf = new DefaultHeadersMapFactory();
- when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
-
- // Register the RuntimeManager bound camelcontext.
- try {
- ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
-
- WorkItemImpl workItem = new WorkItemImpl();
- workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId);
- workItem.setParameter("Request", "someRequest");
- workItem.setDeploymentId("testDeploymentId");
- workItem.setProcessInstanceId(1L);
- workItem.setId(1L);
-
- AbstractCamelWorkItemHandler handler = new InOnlyCamelWorkItemHandler(runtimeManager);
-
- TestWorkItemManager manager = new TestWorkItemManager();
- handler.executeWorkItem(workItem,
- manager);
- assertThat(manager.getResults(), is(notNullValue()));
- // InOnly does not complete WorkItem.
- assertThat(manager.getResults().size(), equalTo(0));
- } finally {
- ServiceRegistry.get().remove(runtimeManagerId + "_CamelService");
- }
- }
-}
diff --git a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOutCamelWorkItemHandlerTest.java b/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOutCamelWorkItemHandlerTest.java
deleted file mode 100644
index 32a967d..0000000
--- a/components/camel-jbpm/src/test/java/org/apache/camel/component/jbpm/workitem/InOutCamelWorkItemHandlerTest.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.jbpm.workitem;
-
-import java.util.Map;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.component.jbpm.JBPMConstants;
-import org.apache.camel.impl.engine.DefaultHeadersMapFactory;
-import org.apache.camel.spi.HeadersMapFactory;
-import org.drools.core.process.instance.impl.WorkItemImpl;
-import org.jbpm.bpmn2.handler.WorkItemHandlerRuntimeException;
-import org.jbpm.process.workitem.core.TestWorkItemManager;
-import org.jbpm.services.api.service.ServiceRegistry;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.kie.api.runtime.manager.RuntimeManager;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.*;
-import static org.mockito.ArgumentMatchers.*;
-import static org.mockito.Mockito.*;
-
-@RunWith(MockitoJUnitRunner.class)
-public class InOutCamelWorkItemHandlerTest {
-
- @Mock
- ProducerTemplate producerTemplate;
-
- @Mock
- Exchange outExchange;
-
- @Mock
- Message outMessage;
-
- @Mock
- CamelContext camelContext;
-
- @Mock
- RuntimeManager runtimeManager;
-
- @Test
- public void testExecuteInOutGlobalCamelContext() throws Exception {
-
- String camelEndpointId = "testCamelRoute";
- String camelRouteUri = "direct:" + camelEndpointId;
-
- String testReponse = "testResponse";
-
- when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenReturn(outExchange);
- when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
- when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
- HeadersMapFactory hmf = new DefaultHeadersMapFactory();
- when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
-
- when(outExchange.getOut()).thenReturn(outMessage);
- when(outMessage.getBody()).thenReturn(testReponse);
-
- try {
- ServiceRegistry.get().register("GlobalCamelService", camelContext);
-
- TestWorkItemManager manager = new TestWorkItemManager();
- WorkItemImpl workItem = new WorkItemImpl();
- workItem.setParameter("CamelEndpointId", camelEndpointId);
- workItem.setParameter("Request", "someRequest");
- workItem.setDeploymentId("testDeploymentId");
- workItem.setProcessInstanceId(1L);
- workItem.setId(1L);
-
- AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler();
-
- handler.executeWorkItem(workItem, manager);
- assertThat(manager.getResults(), is(notNullValue()));
- assertThat(manager.getResults().size(), equalTo(1));
- assertThat(manager.getResults().containsKey(workItem.getId()), is(true));
- Map<String, Object> results = manager.getResults(workItem.getId());
- assertThat(results.size(), equalTo(2));
- assertThat(results.get("Response"), equalTo(testReponse));
-
- } finally {
- ServiceRegistry.get().remove("GlobalCamelService");
- }
-
- }
-
- @Test
- public void testExecuteInOutLocalCamelContext() throws Exception {
-
- String camelEndpointId = "testCamelRoute";
- String camelRouteUri = "direct:" + camelEndpointId;
-
- String testReponse = "testResponse";
-
- String runtimeManagerId = "testRuntimeManager";
-
- when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
- when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenReturn(outExchange);
- when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
- when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
- HeadersMapFactory hmf = new DefaultHeadersMapFactory();
- when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
-
- when(outExchange.getOut()).thenReturn(outMessage);
- when(outMessage.getBody()).thenReturn(testReponse);
-
- // Register the RuntimeManager bound camelcontext.
- try {
- ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
-
- WorkItemImpl workItem = new WorkItemImpl();
- workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId);
- workItem.setParameter("Request", "someRequest");
- workItem.setDeploymentId("testDeploymentId");
- workItem.setProcessInstanceId(1L);
- workItem.setId(1L);
-
- AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager);
-
- TestWorkItemManager manager = new TestWorkItemManager();
- handler.executeWorkItem(workItem, manager);
- assertThat(manager.getResults(), is(notNullValue()));
- assertThat(manager.getResults().size(), equalTo(1));
- assertThat(manager.getResults().containsKey(workItem.getId()), is(true));
-
- Map<String, Object> results = manager.getResults(workItem.getId());
- assertThat(results.size(), equalTo(2));
- assertThat(results.get(JBPMConstants.RESPONSE_WI_PARAM), equalTo(testReponse));
- } finally {
- ServiceRegistry.get().remove(runtimeManagerId + "_CamelService");
- }
- }
-
- @Test
- public void testExecuteInOutLocalCamelContextLazyInit() throws Exception {
-
- String camelEndpointId = "testCamelRoute";
- String camelRouteUri = "direct:" + camelEndpointId;
-
- String testReponse = "testResponse";
-
- String runtimeManagerId = "testRuntimeManager";
-
- when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
- when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenReturn(outExchange);
- when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
- when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
- HeadersMapFactory hmf = new DefaultHeadersMapFactory();
- when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
-
- when(outExchange.getOut()).thenReturn(outMessage);
- when(outMessage.getBody()).thenReturn(testReponse);
-
- WorkItemImpl workItem = new WorkItemImpl();
- workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId);
- workItem.setParameter("Request", "someRequest");
- workItem.setDeploymentId("testDeploymentId");
- workItem.setProcessInstanceId(1L);
- workItem.setId(1L);
-
- AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager);
-
- // Register the context after we've created the WIH to test lazy-init.
- try {
- ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
-
- TestWorkItemManager manager = new TestWorkItemManager();
- handler.executeWorkItem(workItem, manager);
- assertThat(manager.getResults(), is(notNullValue()));
- assertThat(manager.getResults().size(), equalTo(1));
- assertThat(manager.getResults().containsKey(workItem.getId()), is(true));
-
- Map<String, Object> results = manager.getResults(workItem.getId());
- assertThat(results.size(), equalTo(2));
- assertThat(results.get(JBPMConstants.RESPONSE_WI_PARAM), equalTo(testReponse));
- } finally {
- ServiceRegistry.get().remove(runtimeManagerId + "_CamelService");
- }
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testExecuteInOutLocalCamelContextLazyInitFail() throws Exception {
-
- String camelEndpointId = "testCamelRoute";
- String camelRouteUri = "direct:" + camelEndpointId;
-
- String testReponse = "testResponse";
-
- String runtimeManagerId = "testRuntimeManager";
-
- when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
- WorkItemImpl workItem = new WorkItemImpl();
- workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId);
- workItem.setParameter("Request", "someRequest");
- workItem.setDeploymentId("testDeploymentId");
- workItem.setProcessInstanceId(1L);
- workItem.setId(1L);
-
- AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager);
-
- TestWorkItemManager manager = new TestWorkItemManager();
- // This is expected to throw an exception.
- handler.executeWorkItem(workItem, manager);
-
- }
-
- @Test
- public void testExecuteInOutLocalCamelContextDefaultHandleException() throws Exception {
-
- String camelEndpointId = "testCamelRoute";
- String camelRouteUri = "direct:" + camelEndpointId;
-
- String testReponse = "testResponse";
-
- String runtimeManagerId = "testRuntimeManager";
-
-
- when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
- //Throw an error back to the WIH
- when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenThrow(new ToBeHandledException());
- when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
- when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
- HeadersMapFactory hmf = new DefaultHeadersMapFactory();
- when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
-
- // Register the RuntimeManager bound camelcontext.
- try {
- ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
-
- WorkItemImpl workItem = new WorkItemImpl();
- workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId);
- workItem.setParameter("Request", "someRequest");
- workItem.setDeploymentId("testDeploymentId");
- workItem.setProcessInstanceId(1L);
- workItem.setId(1L);
-
- AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager);
-
- TestWorkItemManager manager = new TestWorkItemManager();
- try {
- handler.executeWorkItem(workItem, manager);
- throw new RuntimeException("The test expects an exception. This code should never be reached.");
- } catch (Throwable wihRe) {
- assertThat(wihRe, is(instanceOf(WorkItemHandlerRuntimeException.class)));
- assertThat(wihRe.getCause(), is(instanceOf(ToBeHandledException.class)));
- }
-
- } finally {
- ServiceRegistry.get().remove(runtimeManagerId + "_CamelService");
- }
- }
-
- @Test
- public void testExecuteInOutLocalCamelContextExplicitHandleException() throws Exception {
-
- String camelEndpointId = "testCamelRoute";
- String camelRouteUri = "direct:" + camelEndpointId;
-
- String testReponse = "testResponse";
-
- String runtimeManagerId = "testRuntimeManager";
-
-
- when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
- //Throw an error back to the WIH
- when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenThrow(new ToBeHandledException());
- when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
- when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
- HeadersMapFactory hmf = new DefaultHeadersMapFactory();
- when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
-
- // Register the RuntimeManager bound camelcontext.
- try {
- ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
-
- WorkItemImpl workItem = new WorkItemImpl();
- workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId);
- workItem.setParameter("Request", "someRequest");
- workItem.setParameter("HandleExceptions", true);
- workItem.setDeploymentId("testDeploymentId");
- workItem.setProcessInstanceId(1L);
- workItem.setId(1L);
-
- AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager);
-
- TestWorkItemManager manager = new TestWorkItemManager();
- try {
- handler.executeWorkItem(workItem, manager);
- throw new RuntimeException("The test expects an exception. This code should never be reached.");
- } catch (Throwable wihRe) {
- assertThat(wihRe, is(instanceOf(WorkItemHandlerRuntimeException.class)));
- assertThat(wihRe.getCause(), is(instanceOf(ToBeHandledException.class)));
- }
-
- } finally {
- ServiceRegistry.get().remove(runtimeManagerId + "_CamelService");
- }
- }
-
- @Test
- public void testExecuteInOutLocalCamelContextNotHandleException() throws Exception {
-
- String camelEndpointId = "testCamelRoute";
- String camelRouteUri = "direct:" + camelEndpointId;
-
- String testReponse = "testResponse";
-
- String runtimeManagerId = "testRuntimeManager";
-
- when(runtimeManager.getIdentifier()).thenReturn(runtimeManagerId);
-
- //Throw an error back to the WIH
- when(producerTemplate.send(eq(camelRouteUri), ArgumentMatchers.any(Exchange.class))).thenThrow(new NotToBeHandledException());
- when(producerTemplate.getCamelContext()).thenReturn(camelContext);
-
- when(camelContext.createProducerTemplate()).thenReturn(producerTemplate);
- HeadersMapFactory hmf = new DefaultHeadersMapFactory();
- when(camelContext.getHeadersMapFactory()).thenReturn(hmf);
-
- // Register the RuntimeManager bound camelcontext.
- try {
- ServiceRegistry.get().register(runtimeManagerId + "_CamelService", camelContext);
-
- WorkItemImpl workItem = new WorkItemImpl();
- workItem.setParameter(JBPMConstants.CAMEL_ENDPOINT_ID_WI_PARAM, camelEndpointId);
- workItem.setParameter("Request", "someRequest");
- workItem.setParameter("HandleExceptions", false);
- workItem.setDeploymentId("testDeploymentId");
- workItem.setProcessInstanceId(1L);
- workItem.setId(1L);
-
- AbstractCamelWorkItemHandler handler = new InOutCamelWorkItemHandler(runtimeManager);
-
- TestWorkItemManager manager = new TestWorkItemManager();
- try {
- handler.executeWorkItem(workItem, manager);
- throw new RuntimeException("The test expects an exception. This code should never be reached.");
- } catch (Throwable wihRe) {
- assertThat(wihRe, is(instanceOf(NotToBeHandledException.class)));
- }
-
- } finally {
- ServiceRegistry.get().remove(runtimeManagerId + "_CamelService");
- }
- }
-
- public static class ToBeHandledException extends RuntimeException {
- }
-
- public static class NotToBeHandledException extends RuntimeException {
- }
-
-}
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 4e94753..a982e72 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -29,6 +29,7 @@ import org.apache.camel.AggregationStrategy;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Message;
import org.apache.camel.TypeConverter;
import org.apache.camel.impl.DefaultCamelContext;
@@ -63,7 +64,7 @@ public class KafkaProducerTest {
private TypeConverter converter = Mockito.mock(TypeConverter.class);
private CamelContext context = Mockito.mock(CamelContext.class);
private Exchange exchange = Mockito.mock(Exchange.class);
- private CamelContext camelContext = Mockito.mock(CamelContext.class);
+ private ExtendedCamelContext camelContext = Mockito.mock(ExtendedCamelContext.class);
private Message in = new DefaultMessage(camelContext);
private Message out = new DefaultMessage(camelContext);
private AsyncCallback callback = Mockito.mock(AsyncCallback.class);
@@ -87,7 +88,8 @@ public class KafkaProducerTest {
Mockito.when(exchange.getContext()).thenReturn(context);
Mockito.when(context.getTypeConverter()).thenReturn(converter);
Mockito.when(converter.tryConvertTo(String.class, exchange, null)).thenReturn(null);
- Mockito.when(camelContext.getHeadersMapFactory()).thenReturn(new DefaultHeadersMapFactory());
+ Mockito.when(camelContext.adapt(ExtendedCamelContext.class)).thenReturn(camelContext);
+ Mockito.when(camelContext.adapt(ExtendedCamelContext.class).getHeadersMapFactory()).thenReturn(new DefaultHeadersMapFactory());
Mockito.when(camelContext.getTypeConverter()).thenReturn(converter);
producer.setKafkaProducer(kp);
diff --git a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
index 4d68bc0..5557825 100644
--- a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
+++ b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
@@ -38,6 +38,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Handler;
import org.apache.camel.Message;
import org.apache.camel.Predicate;
@@ -548,7 +549,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
expectedMinimumMessageCount(1);
}
if (expectedHeaderValues == null) {
- expectedHeaderValues = getCamelContext().getHeadersMapFactory().newMap();
+ expectedHeaderValues = getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap();
// we just wants to expects to be called once
expects(new AssertionTask() {
@Override
@@ -1565,7 +1566,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
if (expectedHeaderValues != null) {
if (actualHeaderValues == null) {
- actualHeaderValues = getCamelContext().getHeadersMapFactory().newMap();
+ actualHeaderValues = getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap();
}
if (in.hasHeaders()) {
actualHeaderValues.putAll(in.getHeaders());
@@ -1574,7 +1575,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
if (expectedPropertyValues != null) {
if (actualPropertyValues == null) {
- actualPropertyValues = getCamelContext().getHeadersMapFactory().newMap();
+ actualPropertyValues = getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap();
}
actualPropertyValues.putAll(copy.getProperties());
}
diff --git a/components/camel-openstack/src/test/java/org/apache/camel/component/openstack/AbstractProducerTestSupport.java b/components/camel-openstack/src/test/java/org/apache/camel/component/openstack/AbstractProducerTestSupport.java
index 86aeddf..8406e84 100644
--- a/components/camel-openstack/src/test/java/org/apache/camel/component/openstack/AbstractProducerTestSupport.java
+++ b/components/camel-openstack/src/test/java/org/apache/camel/component/openstack/AbstractProducerTestSupport.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Message;
import org.apache.camel.Producer;
import org.apache.camel.impl.engine.DefaultHeadersMapFactory;
@@ -42,7 +43,7 @@ public abstract class AbstractProducerTestSupport {
protected Exchange exchange;
@Mock
- protected CamelContext camelContext;
+ protected ExtendedCamelContext camelContext;
protected Message msg;
@@ -52,6 +53,7 @@ public abstract class AbstractProducerTestSupport {
public void before() throws IOException {
msg = new DefaultMessage(camelContext);
when(exchange.getIn()).thenReturn(msg);
- when(camelContext.getHeadersMapFactory()).thenReturn(new DefaultHeadersMapFactory());
+ when(camelContext.adapt(ExtendedCamelContext.class)).thenReturn(camelContext);
+ when(camelContext.adapt(ExtendedCamelContext.class).getHeadersMapFactory()).thenReturn(new DefaultHeadersMapFactory());
}
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
index 6b0396f..eb9dd95 100644
--- a/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/CamelContext.java
@@ -25,7 +25,6 @@ import org.apache.camel.spi.DataType;
import org.apache.camel.spi.Debugger;
import org.apache.camel.spi.EndpointRegistry;
import org.apache.camel.spi.ExecutorServiceManager;
-import org.apache.camel.spi.HeadersMapFactory;
import org.apache.camel.spi.InflightRepository;
import org.apache.camel.spi.Injector;
import org.apache.camel.spi.Language;
@@ -34,7 +33,6 @@ import org.apache.camel.spi.ManagementNameStrategy;
import org.apache.camel.spi.ManagementStrategy;
import org.apache.camel.spi.MessageHistoryFactory;
import org.apache.camel.spi.PropertiesComponent;
-import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.spi.Registry;
import org.apache.camel.spi.RestConfiguration;
import org.apache.camel.spi.RestRegistry;
@@ -1272,24 +1270,4 @@ public interface CamelContext extends StatefulService, RuntimeConfiguration {
*/
SSLContextParameters getSSLContextParameters();
- /**
- * Gets the {@link HeadersMapFactory} to use.
- */
- HeadersMapFactory getHeadersMapFactory();
-
- /**
- * Sets a custom {@link HeadersMapFactory} to be used.
- */
- void setHeadersMapFactory(HeadersMapFactory factory);
-
- /**
- * Gets the {@link ReactiveExecutor} to use.
- */
- ReactiveExecutor getReactiveExecutor();
-
- /**
- * Sets a custom {@link ReactiveExecutor} to be used.
- */
- void setReactiveExecutor(ReactiveExecutor reactiveExecutor);
-
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index 7e38f0f..22efe7d 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -34,6 +34,7 @@ import org.apache.camel.spi.DeferServiceFactory;
import org.apache.camel.spi.EndpointStrategy;
import org.apache.camel.spi.FactoryFinder;
import org.apache.camel.spi.FactoryFinderResolver;
+import org.apache.camel.spi.HeadersMapFactory;
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.LogListener;
@@ -43,6 +44,7 @@ import org.apache.camel.spi.NodeIdFactory;
import org.apache.camel.spi.PackageScanClassResolver;
import org.apache.camel.spi.PackageScanResourceResolver;
import org.apache.camel.spi.ProcessorFactory;
+import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.spi.Registry;
import org.apache.camel.spi.RouteStartupOrder;
import org.apache.camel.spi.UnitOfWorkFactory;
@@ -359,4 +361,23 @@ public interface ExtendedCamelContext extends CamelContext {
*/
void setBeanIntrospection(BeanIntrospection beanIntrospection);
+ /**
+ * Gets the {@link HeadersMapFactory} to use.
+ */
+ HeadersMapFactory getHeadersMapFactory();
+
+ /**
+ * Sets a custom {@link HeadersMapFactory} to be used.
+ */
+ void setHeadersMapFactory(HeadersMapFactory factory);
+
+ /**
+ * Gets the {@link ReactiveExecutor} to use.
+ */
+ ReactiveExecutor getReactiveExecutor();
+
+ /**
+ * Sets a custom {@link ReactiveExecutor} to be used.
+ */
+ void setReactiveExecutor(ReactiveExecutor reactiveExecutor);
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
index 7631456..fb67842 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.StaticService;
@@ -87,7 +88,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
}
public void await(Exchange exchange, CountDownLatch latch) {
- ReactiveExecutor reactiveExecutor = exchange.getContext().getReactiveExecutor();
+ ReactiveExecutor reactiveExecutor = exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor();
// Early exit for pending reactive queued work
do {
if (latch.getCount() <= 0) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 527a972..783fd2e 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -55,7 +55,6 @@ import org.apache.camel.spi.Transformer;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.spi.UnitOfWorkFactory;
import org.apache.camel.support.CamelContextHelper;
-import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.support.OrderedComparator;
import org.apache.camel.support.SynchronizationAdapter;
@@ -106,14 +105,14 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
public CamelInternalProcessor(CamelContext camelContext) {
this.camelContext = camelContext;
- this.reactiveExecutor = camelContext.getReactiveExecutor();
+ this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
this.shutdownStrategy = camelContext.getShutdownStrategy();
}
public CamelInternalProcessor(CamelContext camelContext, Processor processor) {
super(processor);
this.camelContext = camelContext;
- this.reactiveExecutor = camelContext.getReactiveExecutor();
+ this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
this.shutdownStrategy = camelContext.getShutdownStrategy();
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
index aae38ee..a70d550 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/LoopProcessor.java
@@ -17,13 +17,16 @@
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
@@ -42,12 +45,16 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
private String id;
private String routeId;
+ private final CamelContext camelContext;
+ private final ReactiveExecutor reactiveExecutor;
private final Expression expression;
private final Predicate predicate;
private final boolean copy;
- public LoopProcessor(Processor processor, Expression expression, Predicate predicate, boolean copy) {
+ public LoopProcessor(CamelContext camelContext, Processor processor, Expression expression, Predicate predicate, boolean copy) {
super(processor);
+ this.camelContext = camelContext;
+ this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
this.expression = expression;
this.predicate = predicate;
this.copy = copy;
@@ -59,9 +66,9 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
LoopState state = new LoopState(exchange, callback);
if (exchange.isTransacted()) {
- exchange.getContext().getReactiveExecutor().scheduleSync(state);
+ reactiveExecutor.scheduleSync(state);
} else {
- exchange.getContext().getReactiveExecutor().scheduleMain(state);
+ reactiveExecutor.scheduleMain(state);
}
return false;
} catch (Exception e) {
@@ -117,7 +124,7 @@ public class LoopProcessor extends DelegateAsyncProcessor implements Traceable,
processor.process(current, doneSync -> {
// increment counter after done
index++;
- exchange.getContext().getReactiveExecutor().schedule(this);
+ reactiveExecutor.schedule(this);
});
} else {
// we are done so prepare the result
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 7a4c0bd..82bafea 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -43,6 +43,7 @@ import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
import org.apache.camel.ErrorHandlerFactory;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -50,6 +51,7 @@ import org.apache.camel.RuntimeCamelException;
import org.apache.camel.StreamCache;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.UnitOfWork;
@@ -145,6 +147,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
protected final Processor onPrepare;
private final CamelContext camelContext;
+ private final ReactiveExecutor reactiveExecutor;
private String id;
private String routeId;
private Collection<Processor> processors;
@@ -182,6 +185,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
boolean parallelAggregate, boolean stopOnAggregateException) {
notNull(camelContext, "camelContext");
this.camelContext = camelContext;
+ this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
this.processors = processors;
this.aggregationStrategy = aggregationStrategy;
this.executorService = executorService;
@@ -246,12 +250,12 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
MulticastState state = new MulticastState(exchange, pairs, callback);
if (isParallelProcessing()) {
- executorService.submit(() -> exchange.getContext().getReactiveExecutor().schedule(state));
+ executorService.submit(() -> reactiveExecutor.schedule(state));
} else {
if (exchange.isTransacted()) {
- exchange.getContext().getReactiveExecutor().scheduleSync(state);
+ reactiveExecutor.scheduleSync(state);
} else {
- exchange.getContext().getReactiveExecutor().scheduleMain(state);
+ reactiveExecutor.scheduleMain(state);
}
}
@@ -263,9 +267,9 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
protected void schedule(Runnable runnable) {
if (isParallelProcessing()) {
- executorService.submit(() -> camelContext.getReactiveExecutor().schedule(runnable));
+ executorService.submit(() -> reactiveExecutor.schedule(runnable));
} else {
- camelContext.getReactiveExecutor().schedule(runnable);
+ reactiveExecutor.schedule(runnable);
}
}
@@ -552,7 +556,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat
original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust);
}
- camelContext.getReactiveExecutor().schedule(callback);
+ reactiveExecutor.schedule(callback);
}
/**
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index 3cfba1b..350c5e5 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -26,6 +26,7 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
@@ -58,7 +59,7 @@ public class Pipeline extends AsyncProcessorSupport implements Navigate<Processo
public Pipeline(CamelContext camelContext, Collection<Processor> processors) {
this.camelContext = camelContext;
- this.reactiveExecutor = camelContext.getReactiveExecutor();
+ this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
this.processors = processors.stream().map(AsyncProcessorConverterHelper::convert).collect(Collectors.toList());
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index 37d457c..5e23568 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -81,7 +81,7 @@ public class SharedCamelInternalProcessor {
public SharedCamelInternalProcessor(CamelContext camelContext, CamelInternalProcessorAdvice... advices) {
this.camelContext = camelContext;
- this.reactiveExecutor = camelContext.getReactiveExecutor();
+ this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
this.awaitManager = camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
this.shutdownStrategy = camelContext.getShutdownStrategy();
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
index b29a275..cd08f60 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -22,11 +22,14 @@ import java.util.List;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
@@ -42,13 +45,17 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
private static final Logger LOG = LoggerFactory.getLogger(TryProcessor.class);
+ protected final CamelContext camelContext;
+ protected final ReactiveExecutor reactiveExecutor;
protected String id;
protected String routeId;
protected final Processor tryProcessor;
protected final List<Processor> catchClauses;
protected final Processor finallyProcessor;
- public TryProcessor(Processor tryProcessor, List<Processor> catchClauses, Processor finallyProcessor) {
+ public TryProcessor(CamelContext camelContext, Processor tryProcessor, List<Processor> catchClauses, Processor finallyProcessor) {
+ this.camelContext = camelContext;
+ this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
this.tryProcessor = tryProcessor;
this.catchClauses = catchClauses;
this.finallyProcessor = finallyProcessor;
@@ -66,7 +73,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
- exchange.getContext().getReactiveExecutor().schedule(new TryState(exchange, callback));
+ reactiveExecutor.schedule(new TryState(exchange, callback));
return false;
}
@@ -95,7 +102,7 @@ public class TryProcessor extends AsyncProcessorSupport implements Navigate<Proc
Processor processor = processors.next();
AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor);
LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
- async.process(exchange, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this));
+ async.process(exchange, doneSync -> reactiveExecutor.schedule(this));
} else {
ExchangeHelper.prepareOutToIn(exchange);
exchange.removeProperty(Exchange.TRY_ROUTE_BLOCK);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 9eac797..a8bde2d 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -42,6 +42,7 @@ import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Navigate;
import org.apache.camel.NoSuchEndpointException;
@@ -55,6 +56,7 @@ import org.apache.camel.spi.AggregationRepository;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.OptimisticLockingAggregationRepository;
+import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.spi.RecoverableAggregationRepository;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.ShutdownAware;
@@ -106,6 +108,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
private volatile Lock lock;
private final AtomicBoolean aggregateRepositoryWarned = new AtomicBoolean();
private final CamelContext camelContext;
+ private final ReactiveExecutor reactiveExecutor;
private final AsyncProcessor processor;
private String id;
private String routeId;
@@ -259,6 +262,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy");
ObjectHelper.notNull(executorService, "executorService");
this.camelContext = camelContext;
+ this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
this.processor = processor;
this.correlationExpression = correlationExpression;
this.aggregationStrategy = aggregationStrategy;
@@ -857,7 +861,7 @@ public class AggregateProcessor extends AsyncProcessorSupport implements Navigat
// send this exchange
// the call to schedule last if needed to ensure in-order processing of the aggregates
- executorService.submit(() -> camelContext.getReactiveExecutor().scheduleSync(() -> processor.process(exchange, done -> {
+ executorService.submit(() -> reactiveExecutor.scheduleSync(() -> processor.process(exchange, done -> {
// log exception if there was a problem
if (exchange.getException() != null) {
// if there was an exception then let the exception handler handle it
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 32d61bc..e9bb3bb 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -102,7 +102,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport impleme
ObjectHelper.notNull(redeliveryPolicy, "RedeliveryPolicy", this);
this.camelContext = camelContext;
- this.reactiveExecutor = camelContext.getReactiveExecutor();
+ this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
this.awaitManager = camelContext.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager();
this.shutdownStrategy = camelContext.getShutdownStrategy();
this.redeliveryProcessor = redeliveryProcessor;
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
index 3f1a15e..286ea75 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
@@ -25,6 +25,7 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Traceable;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.ExchangeHelper;
@@ -164,7 +165,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
AsyncProcessor[] processors = doGetProcessors();
- exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
+ exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
return false;
}
@@ -251,7 +252,7 @@ public class FailOverLoadBalancer extends LoadBalancerSupport implements Traceab
// process the exchange
LOG.debug("Processing failover at attempt {} for {}", attempts, copy);
- processor.process(copy, doneSync -> exchange.getContext().getReactiveExecutor().schedule(this::run));
+ processor.process(copy, doneSync -> exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor().schedule(this::run));
}
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
index e8171b7..433a9fd 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
@@ -19,6 +19,7 @@ package org.apache.camel.processor.loadbalancer;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Processor;
/**
@@ -33,7 +34,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport {
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
AsyncProcessor[] processors = doGetProcessors();
- exchange.getContext().getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
+ exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor().schedule(new State(exchange, callback, processors)::run);
return false;
}
@@ -64,7 +65,7 @@ public class TopicLoadBalancer extends LoadBalancerSupport {
exchange.setException(current.getException());
callback.done(false);
} else {
- exchange.getContext().getReactiveExecutor().schedule(this::run);
+ exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor().schedule(this::run);
}
}
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoopReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoopReifier.java
index 434abc0..464599a 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoopReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/LoopReifier.java
@@ -43,7 +43,7 @@ public class LoopReifier extends ExpressionReifier<LoopDefinition> {
} else {
expression = definition.getExpression().createExpression(routeContext);
}
- return new LoopProcessor(output, expression, predicate, isCopy);
+ return new LoopProcessor(routeContext.getCamelContext(), output, expression, predicate, isCopy);
}
}
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TryReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TryReifier.java
index 4809de5..1a88847 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TryReifier.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/TryReifier.java
@@ -59,7 +59,7 @@ public class TryReifier extends ProcessorReifier<TryDefinition> {
throw new IllegalArgumentException("doTry must have one or more catch or finally blocks on " + this);
}
- return new TryProcessor(tryProcessor, catchProcessors, finallyProcessor);
+ return new TryProcessor(routeContext.getCamelContext(), tryProcessor, catchProcessors, finallyProcessor);
}
}
diff --git a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
index 4cc4729..e529703 100644
--- a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
+++ b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
@@ -260,7 +260,7 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
HeadersMapFactory headersMapFactory = getBeanForType(HeadersMapFactory.class);
if (headersMapFactory != null) {
LOG.info("Using custom HeadersMapFactory: {}", headersMapFactory);
- getContext().setHeadersMapFactory(headersMapFactory);
+ getContext().adapt(ExtendedCamelContext.class).setHeadersMapFactory(headersMapFactory);
}
JSonSchemaResolver jsonSchemaResolver = getBeanForType(JSonSchemaResolver.class);
if (jsonSchemaResolver != null) {
@@ -1231,7 +1231,7 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
ReactiveExecutor reactiveExecutor = getBeanForType(ReactiveExecutor.class);
if (reactiveExecutor != null) {
// already logged in CamelContext
- getContext().setReactiveExecutor(reactiveExecutor);
+ getContext().adapt(ExtendedCamelContext.class).setReactiveExecutor(reactiveExecutor);
}
}
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/CustomHeadersMapFactoryRouteTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/CustomHeadersMapFactoryRouteTest.java
index 1d78021..be908ea 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/CustomHeadersMapFactoryRouteTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/CustomHeadersMapFactoryRouteTest.java
@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spi.HeadersMapFactory;
import org.junit.Test;
@@ -32,7 +33,7 @@ public class CustomHeadersMapFactoryRouteTest extends ContextTestSupport {
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext context = super.createCamelContext();
- context.setHeadersMapFactory(custom);
+ context.adapt(ExtendedCamelContext.class).setHeadersMapFactory(custom);
return context;
}
@@ -51,7 +52,7 @@ public class CustomHeadersMapFactoryRouteTest extends ContextTestSupport {
assertMockEndpointsSatisfied();
- assertSame(custom, context.getHeadersMapFactory());
+ assertSame(custom, context.adapt(ExtendedCamelContext.class).getHeadersMapFactory());
}
@Override
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/HashMapHeadersMapFactoryRouteTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/HashMapHeadersMapFactoryRouteTest.java
index d906a80..052c7f4 100644
--- a/core/camel-core/src/test/java/org/apache/camel/impl/HashMapHeadersMapFactoryRouteTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/HashMapHeadersMapFactoryRouteTest.java
@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.engine.HashMapHeadersMapFactory;
import org.junit.Test;
@@ -30,7 +31,7 @@ public class HashMapHeadersMapFactoryRouteTest extends ContextTestSupport {
@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext context = super.createCamelContext();
- context.setHeadersMapFactory(new HashMapHeadersMapFactory());
+ context.adapt(ExtendedCamelContext.class).setHeadersMapFactory(new HashMapHeadersMapFactory());
return context;
}
diff --git a/core/camel-headersmap/src/test/java/org/apache/camel/component/headersmap/CamelFastHeadersMapTest.java b/core/camel-headersmap/src/test/java/org/apache/camel/component/headersmap/CamelFastHeadersMapTest.java
index 9df52ae..616671b 100644
--- a/core/camel-headersmap/src/test/java/org/apache/camel/component/headersmap/CamelFastHeadersMapTest.java
+++ b/core/camel-headersmap/src/test/java/org/apache/camel/component/headersmap/CamelFastHeadersMapTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.headersmap;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spi.HeadersMapFactory;
@@ -34,7 +35,7 @@ public class CamelFastHeadersMapTest extends CamelTestSupport {
assertMockEndpointsSatisfied();
// should have detected custom and use that
- HeadersMapFactory factory = context.getHeadersMapFactory();
+ HeadersMapFactory factory = context.adapt(ExtendedCamelContext.class).getHeadersMapFactory();
assertIsInstanceOf(FastHeadersMapFactory.class, factory);
}
diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
index f9cb86d..2b7841a 100644
--- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
+++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
@@ -143,7 +143,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements Ti
@Override
public String getHeadersMapFactoryClassName() {
- return context.getHeadersMapFactory().getClass().getName();
+ return context.adapt(ExtendedCamelContext.class).getHeadersMapFactory().getClass().getName();
}
@Override
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index 4e86dd6..ac78c4d 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -29,6 +29,7 @@ import org.apache.camel.CamelExecutionException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Message;
import org.apache.camel.MessageHistory;
@@ -132,7 +133,7 @@ public final class DefaultExchange implements ExtendedExchange {
return null;
}
- return context.getHeadersMapFactory().newMap(headers);
+ return context.adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap(headers);
}
@SuppressWarnings("unchecked")
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java
index 9a4da71..392191c 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java
@@ -23,6 +23,7 @@ import java.util.function.Supplier;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.spi.HeadersMapFactory;
import org.apache.camel.util.ObjectHelper;
@@ -212,11 +213,11 @@ public class DefaultMessage extends MessageSupport {
public void setHeaders(Map<String, Object> headers) {
ObjectHelper.notNull(getCamelContext(), "CamelContext", this);
- if (getCamelContext().getHeadersMapFactory().isInstanceOf(headers)) {
+ if (getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().isInstanceOf(headers)) {
this.headers = headers;
} else {
// create a new map
- this.headers = getCamelContext().getHeadersMapFactory().newMap(headers);
+ this.headers = getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap(headers);
}
}
@@ -247,7 +248,7 @@ public class DefaultMessage extends MessageSupport {
protected Map<String, Object> createHeaders() {
ObjectHelper.notNull(getCamelContext(), "CamelContext", this);
- Map<String, Object> map = getCamelContext().getHeadersMapFactory().newMap();
+ Map<String, Object> map = getCamelContext().adapt(ExtendedCamelContext.class).getHeadersMapFactory().newMap();
populateInitialHeaders(map);
return map;
}