You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/04/02 12:50:07 UTC
[camel] 03/09: CAMEL-15105: make the AsyncProcessorAwaitManager a plugin of the context
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 501498d9c8620dd825f94bb177ff63719f2d18d9
Author: Otavio Rodolfo Piske <an...@gmail.com>
AuthorDate: Sat Apr 1 15:58:05 2023 +0200
CAMEL-15105: make the AsyncProcessorAwaitManager a plugin of the context
---
.../java/org/apache/camel/ExtendedCamelContext.java | 16 ----------------
.../camel/impl/engine/AbstractCamelContext.java | 8 ++------
.../impl/engine/DefaultCamelContextExtension.java | 19 -------------------
.../impl/engine/SharedCamelInternalProcessor.java | 3 ++-
.../apache/camel/impl/console/BlockedConsole.java | 5 +++--
.../camel/impl/ExtendedCamelContextConfigurer.java | 6 ------
.../impl/lw/LightweightCamelContextExtension.java | 13 -------------
.../errorhandler/RedeliveryErrorHandler.java | 2 +-
.../core/xml/AbstractCamelContextFactoryBean.java | 2 +-
.../processor/async/AsyncEndpointPolicyTest.java | 3 ++-
.../AsyncProcessorAwaitManagerInterruptTest.java | 21 ++++++++++++---------
...ssorAwaitManagerInterruptWithRedeliveryTest.java | 19 +++++++++++--------
.../async/AsyncProcessorAwaitManagerTest.java | 16 +++++++++-------
.../camel/main/DefaultConfigurationConfigurer.java | 2 +-
.../apache/camel/support/AsyncProcessorHelper.java | 2 +-
.../apache/camel/support/AsyncProcessorSupport.java | 2 +-
.../apache/camel/support/DefaultAsyncProducer.java | 2 +-
.../java/org/apache/camel/support/PluginHelper.java | 20 +++++++++++++++++++-
18 files changed, 66 insertions(+), 95 deletions(-)
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index e8346448418..16dbcc3c229 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -23,7 +23,6 @@ import java.util.function.Supplier;
import org.apache.camel.catalog.RuntimeCamelCatalog;
import org.apache.camel.spi.AnnotationBasedProcessorFactory;
-import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.BeanIntrospection;
import org.apache.camel.spi.BeanProcessorFactory;
import org.apache.camel.spi.BeanProxyFactory;
@@ -49,7 +48,6 @@ import org.apache.camel.spi.ResourceLoader;
import org.apache.camel.spi.RestBindingJaxbDataFormatFactory;
import org.apache.camel.spi.RouteController;
import org.apache.camel.spi.RouteStartupOrder;
-import org.apache.camel.spi.RoutesLoader;
import org.apache.camel.spi.StartupStepRecorder;
import org.apache.camel.spi.UnitOfWorkFactory;
@@ -384,20 +382,6 @@ public interface ExtendedCamelContext {
*/
void addLogListener(LogListener listener);
- /**
- * Gets the {@link org.apache.camel.AsyncProcessor} await manager.
- *
- * @return the manager
- */
- AsyncProcessorAwaitManager getAsyncProcessorAwaitManager();
-
- /**
- * Sets a custom {@link org.apache.camel.AsyncProcessor} await manager.
- *
- * @param manager the manager
- */
- void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager manager);
-
/**
* Gets the {@link BeanIntrospection}
*/
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index a177273fd49..36aecfecdc5 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -221,7 +221,6 @@ public abstract class AbstractCamelContext extends BaseService
volatile ModelToXMLDumper modelToXMLDumper;
volatile RestBindingJaxbDataFormatFactory restBindingJaxbDataFormatFactory;
volatile RuntimeCamelCatalog runtimeCamelCatalog;
- volatile AsyncProcessorAwaitManager asyncProcessorAwaitManager;
volatile UnitOfWorkFactory unitOfWorkFactory;
volatile BeanIntrospection beanIntrospection;
volatile boolean eventNotificationApplicable;
@@ -381,6 +380,7 @@ public abstract class AbstractCamelContext extends BaseService
camelContextExtension.lazyAddContextPlugin(InterceptEndpointFactory.class, this::createInterceptEndpointFactory);
camelContextExtension.lazyAddContextPlugin(RouteFactory.class, this::createRouteFactory);
camelContextExtension.lazyAddContextPlugin(RoutesLoader.class, this::createRoutesLoader);
+ camelContextExtension.lazyAddContextPlugin(AsyncProcessorAwaitManager.class, this::createAsyncProcessorAwaitManager);
if (build) {
try {
@@ -2878,6 +2878,7 @@ public abstract class AbstractCamelContext extends BaseService
// shutdown await manager to trigger interrupt of blocked threads to
// attempt to free these threads graceful
+ final AsyncProcessorAwaitManager asyncProcessorAwaitManager = PluginHelper.getAsyncProcessorAwaitManager(this);
InternalServiceManager.shutdownServices(this, asyncProcessorAwaitManager);
// we need also to include routes which failed to start to ensure all resources get stopped when stopping Camel
@@ -3286,7 +3287,6 @@ public abstract class AbstractCamelContext extends BaseService
typeConverterRegistry = null;
typeConverter = null;
reactiveExecutor = null;
- asyncProcessorAwaitManager = null;
exchangeFactory = null;
exchangeFactoryManager = null;
processorExchangeFactory = null;
@@ -4171,10 +4171,6 @@ public abstract class AbstractCamelContext extends BaseService
camelContextExtension.addInterceptStrategy(interceptStrategy);
}
- public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() {
- return camelContextExtension.getAsyncProcessorAwaitManager();
- }
-
public BeanIntrospection getBeanIntrospection() {
return camelContextExtension.getBeanIntrospection();
}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
index b981927d3cb..dc0532c2898 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
@@ -37,7 +37,6 @@ import org.apache.camel.RuntimeCamelException;
import org.apache.camel.Service;
import org.apache.camel.catalog.RuntimeCamelCatalog;
import org.apache.camel.spi.AnnotationBasedProcessorFactory;
-import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.BeanIntrospection;
import org.apache.camel.spi.BeanProcessorFactory;
import org.apache.camel.spi.BeanProxyFactory;
@@ -407,24 +406,6 @@ class DefaultCamelContextExtension implements ExtendedCamelContext {
}
}
- @Override
- public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() {
- if (camelContext.asyncProcessorAwaitManager == null) {
- synchronized (camelContext.lock) {
- if (camelContext.asyncProcessorAwaitManager == null) {
- setAsyncProcessorAwaitManager(camelContext.createAsyncProcessorAwaitManager());
- }
- }
- }
- return camelContext.asyncProcessorAwaitManager;
- }
-
- @Override
- public void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager asyncProcessorAwaitManager) {
- camelContext.asyncProcessorAwaitManager
- = camelContext.getInternalServiceManager().addService(asyncProcessorAwaitManager);
- }
-
@Override
public BeanIntrospection getBeanIntrospection() {
if (camelContext.beanIntrospection == null) {
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
index 6ae6ed90103..28f89571d46 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SharedCamelInternalProcessor.java
@@ -37,6 +37,7 @@ import org.apache.camel.spi.Transformer;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
import org.apache.camel.support.OrderedComparator;
+import org.apache.camel.support.PluginHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -82,7 +83,7 @@ public class SharedCamelInternalProcessor implements SharedInternalProcessor {
public SharedCamelInternalProcessor(CamelContext camelContext, CamelInternalProcessorAdvice... advices) {
this.camelContext = camelContext;
this.reactiveExecutor = camelContext.getCamelContextExtension().getReactiveExecutor();
- this.awaitManager = camelContext.getCamelContextExtension().getAsyncProcessorAwaitManager();
+ this.awaitManager = PluginHelper.getAsyncProcessorAwaitManager(camelContext);
this.shutdownStrategy = camelContext.getShutdownStrategy();
if (advices != null) {
diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/BlockedConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/BlockedConsole.java
index 339f216f558..8c897dc9b93 100644
--- a/core/camel-console/src/main/java/org/apache/camel/impl/console/BlockedConsole.java
+++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/BlockedConsole.java
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.annotations.DevConsole;
+import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.console.AbstractDevConsole;
import org.apache.camel.util.TimeUtils;
import org.apache.camel.util.json.JsonObject;
@@ -37,7 +38,7 @@ public class BlockedConsole extends AbstractDevConsole {
protected String doCallText(Map<String, Object> options) {
StringBuilder sb = new StringBuilder();
- AsyncProcessorAwaitManager am = getCamelContext().getCamelContextExtension().getAsyncProcessorAwaitManager();
+ AsyncProcessorAwaitManager am = PluginHelper.getAsyncProcessorAwaitManager(getCamelContext());
sb.append(String.format("\n Blocked: %s", am.size()));
for (AsyncProcessorAwaitManager.AwaitThread at : am.browse()) {
String age = TimeUtils.printDuration(at.getWaitDuration(), true);
@@ -52,7 +53,7 @@ public class BlockedConsole extends AbstractDevConsole {
protected JsonObject doCallJson(Map<String, Object> options) {
JsonObject root = new JsonObject();
- AsyncProcessorAwaitManager am = getCamelContext().getCamelContextExtension().getAsyncProcessorAwaitManager();
+ AsyncProcessorAwaitManager am = PluginHelper.getAsyncProcessorAwaitManager(getCamelContext());
root.put("blocked", am.size());
final List<JsonObject> list = new ArrayList<>();
diff --git a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
index 77230c6cdf2..5866067408b 100644
--- a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
+++ b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java
@@ -23,8 +23,6 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
switch (ignoreCase ? name.toLowerCase() : name) {
case "annotationbasedprocessorfactory":
case "AnnotationBasedProcessorFactory": target.setAnnotationBasedProcessorFactory(property(camelContext, org.apache.camel.spi.AnnotationBasedProcessorFactory.class, value)); return true;
- case "asyncprocessorawaitmanager":
- case "AsyncProcessorAwaitManager": target.setAsyncProcessorAwaitManager(property(camelContext, org.apache.camel.spi.AsyncProcessorAwaitManager.class, value)); return true;
case "basepackagescan":
case "BasePackageScan": target.setBasePackageScan(property(camelContext, java.lang.String.class, value)); return true;
case "beanintrospection":
@@ -76,8 +74,6 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
switch (ignoreCase ? name.toLowerCase() : name) {
case "annotationbasedprocessorfactory":
case "AnnotationBasedProcessorFactory": return org.apache.camel.spi.AnnotationBasedProcessorFactory.class;
- case "asyncprocessorawaitmanager":
- case "AsyncProcessorAwaitManager": return org.apache.camel.spi.AsyncProcessorAwaitManager.class;
case "basepackagescan":
case "BasePackageScan": return java.lang.String.class;
case "beanintrospection":
@@ -130,8 +126,6 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com
switch (ignoreCase ? name.toLowerCase() : name) {
case "annotationbasedprocessorfactory":
case "AnnotationBasedProcessorFactory": return target.getAnnotationBasedProcessorFactory();
- case "asyncprocessorawaitmanager":
- case "AsyncProcessorAwaitManager": return target.getAsyncProcessorAwaitManager();
case "basepackagescan":
case "BasePackageScan": return target.getBasePackageScan();
case "beanintrospection":
diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java
index 82469a20b83..1878d61c495 100644
--- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java
+++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContextExtension.java
@@ -38,7 +38,6 @@ import org.apache.camel.Service;
import org.apache.camel.ServiceStatus;
import org.apache.camel.catalog.RuntimeCamelCatalog;
import org.apache.camel.spi.AnnotationBasedProcessorFactory;
-import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.BeanIntrospection;
import org.apache.camel.spi.BeanProcessorFactory;
import org.apache.camel.spi.BeanProxyFactory;
@@ -63,9 +62,7 @@ import org.apache.camel.spi.Registry;
import org.apache.camel.spi.ResourceLoader;
import org.apache.camel.spi.RestBindingJaxbDataFormatFactory;
import org.apache.camel.spi.RouteController;
-import org.apache.camel.spi.RouteFactory;
import org.apache.camel.spi.RouteStartupOrder;
-import org.apache.camel.spi.RoutesLoader;
import org.apache.camel.spi.StartupStepRecorder;
import org.apache.camel.spi.SupervisingRouteController;
import org.apache.camel.spi.UnitOfWorkFactory;
@@ -290,16 +287,6 @@ class LightweightCamelContextExtension implements ExtendedCamelContext {
throw new UnsupportedOperationException();
}
- @Override
- public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() {
- return camelContext.getCamelContextExtension().getAsyncProcessorAwaitManager();
- }
-
- @Override
- public void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager manager) {
- throw new UnsupportedOperationException();
- }
-
@Override
public BeanIntrospection getBeanIntrospection() {
return camelContext.getCamelContextExtension().getBeanIntrospection();
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 2e8eba75bd7..688b7cf1cd6 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -118,7 +118,7 @@ public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport
this.camelContext = camelContext;
this.reactiveExecutor = camelContext.getCamelContextExtension().getReactiveExecutor();
- this.awaitManager = camelContext.getCamelContextExtension().getAsyncProcessorAwaitManager();
+ this.awaitManager = PluginHelper.getAsyncProcessorAwaitManager(camelContext);
this.shutdownStrategy = camelContext.getShutdownStrategy();
this.redeliveryProcessor = redeliveryProcessor;
this.deadLetter = deadLetter;
diff --git a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
index faa44550efd..ea03b928510 100644
--- a/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
+++ b/core/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
@@ -267,7 +267,7 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
AsyncProcessorAwaitManager asyncProcessorAwaitManager = getBeanForType(AsyncProcessorAwaitManager.class);
if (asyncProcessorAwaitManager != null) {
LOG.info("Using custom AsyncProcessorAwaitManager: {}", asyncProcessorAwaitManager);
- getContext().getCamelContextExtension().setAsyncProcessorAwaitManager(asyncProcessorAwaitManager);
+ getContext().getCamelContextExtension().addContextPlugin(AsyncProcessorAwaitManager.class, asyncProcessorAwaitManager);
}
ManagementStrategy managementStrategy = getBeanForType(ManagementStrategy.class);
if (managementStrategy != null) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
index 9522bbc165c..7c3912d4b62 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointPolicyTest.java
@@ -31,6 +31,7 @@ import org.apache.camel.spi.Policy;
import org.apache.camel.spi.Registry;
import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter;
import org.apache.camel.support.AsyncProcessorConverterHelper;
+import org.apache.camel.support.PluginHelper;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -125,7 +126,7 @@ public class AsyncEndpointPolicyTest extends ContextTestSupport {
public void process(Exchange exchange) throws Exception {
final AsyncProcessorAwaitManager awaitManager
- = exchange.getContext().getCamelContextExtension().getAsyncProcessorAwaitManager();
+ = PluginHelper.getAsyncProcessorAwaitManager(exchange.getContext());
awaitManager.process(this, exchange);
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
index ec8383aaad5..51db2d95b5a 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
@@ -25,6 +25,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.support.PluginHelper;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -34,10 +35,10 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport
@Test
public void testAsyncAwaitInterrupt() throws Exception {
- context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true);
-
- assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size());
+ final AsyncProcessorAwaitManager asyncProcessorAwaitManager = PluginHelper.getAsyncProcessorAwaitManager(context);
+ asyncProcessorAwaitManager.getStatistics().setStatisticsEnabled(true);
+ assertEquals(0, asyncProcessorAwaitManager.size());
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
getMockEndpoint("mock:result").expectedMessageCount(0);
@@ -51,10 +52,10 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport
assertMockEndpointsSatisfied();
- assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size());
+ assertEquals(0, asyncProcessorAwaitManager.size());
assertEquals(1,
- context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked());
- assertEquals(1, context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics()
+ asyncProcessorAwaitManager.getStatistics().getThreadsBlocked());
+ assertEquals(1, asyncProcessorAwaitManager.getStatistics()
.getThreadsInterrupted());
}
@@ -69,17 +70,19 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport
.to("mock:after").process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
- int size = context.getCamelContextExtension().getAsyncProcessorAwaitManager().size();
+ final AsyncProcessorAwaitManager asyncProcessorAwaitManager
+ = PluginHelper.getAsyncProcessorAwaitManager(context);
+ int size = asyncProcessorAwaitManager.size();
log.info("async inflight: {}", size);
assertEquals(1, size);
Collection<AsyncProcessorAwaitManager.AwaitThread> threads
- = context.getCamelContextExtension().getAsyncProcessorAwaitManager().browse();
+ = asyncProcessorAwaitManager.browse();
AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next();
// lets interrupt it
String id = thread.getExchange().getExchangeId();
- context.getCamelContextExtension().getAsyncProcessorAwaitManager().interrupt(id);
+ asyncProcessorAwaitManager.interrupt(id);
}
}).transform(constant("Hi Camel")).to("mock:result");
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java
index 808e39cb562..e8dcf3bee9b 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java
@@ -26,6 +26,7 @@ import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.Registry;
+import org.apache.camel.support.PluginHelper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@@ -50,9 +51,10 @@ public class AsyncProcessorAwaitManagerInterruptWithRedeliveryTest extends Conte
@Test
public void testAsyncAwaitInterrupt() throws Exception {
- context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true);
+ final AsyncProcessorAwaitManager asyncProcessorAwaitManager = PluginHelper.getAsyncProcessorAwaitManager(context);
+ asyncProcessorAwaitManager.getStatistics().setStatisticsEnabled(true);
- assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size());
+ assertEquals(0, asyncProcessorAwaitManager.size());
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
getMockEndpoint("mock:result").expectedMessageCount(0);
@@ -72,10 +74,10 @@ public class AsyncProcessorAwaitManagerInterruptWithRedeliveryTest extends Conte
// Check we have not reached the full 5 re-deliveries
verify(bean, atMost(4)).callMe();
- assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size());
+ assertEquals(0, asyncProcessorAwaitManager.size());
assertEquals(1,
- context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked());
- assertEquals(1, context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics()
+ asyncProcessorAwaitManager.getStatistics().getThreadsBlocked());
+ assertEquals(1, asyncProcessorAwaitManager.getStatistics()
.getThreadsInterrupted());
}
@@ -89,16 +91,17 @@ public class AsyncProcessorAwaitManagerInterruptWithRedeliveryTest extends Conte
}
// Get our blocked thread
- int size = context.getCamelContextExtension().getAsyncProcessorAwaitManager().size();
+ final AsyncProcessorAwaitManager asyncProcessorAwaitManager = PluginHelper.getAsyncProcessorAwaitManager(context);
+ int size = asyncProcessorAwaitManager.size();
assertEquals(1, size);
Collection<AsyncProcessorAwaitManager.AwaitThread> threads
- = context.getCamelContextExtension().getAsyncProcessorAwaitManager().browse();
+ = asyncProcessorAwaitManager.browse();
AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next();
// Interrupt it
String id = thread.getExchange().getExchangeId();
- context.getCamelContextExtension().getAsyncProcessorAwaitManager().interrupt(id);
+ asyncProcessorAwaitManager.interrupt(id);
}).start();
}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
index c70e060063c..659797fa756 100644
--- a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
@@ -23,6 +23,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.support.PluginHelper;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -32,9 +33,10 @@ public class AsyncProcessorAwaitManagerTest extends ContextTestSupport {
@Test
public void testAsyncAwait() throws Exception {
- context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true);
+ final AsyncProcessorAwaitManager asyncProcessorAwaitManager = PluginHelper.getAsyncProcessorAwaitManager(context);
+ asyncProcessorAwaitManager.getStatistics().setStatisticsEnabled(true);
- assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size());
+ assertEquals(0, asyncProcessorAwaitManager.size());
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
@@ -45,10 +47,10 @@ public class AsyncProcessorAwaitManagerTest extends ContextTestSupport {
assertMockEndpointsSatisfied();
- assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().size());
+ assertEquals(0, asyncProcessorAwaitManager.size());
assertEquals(1,
- context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked());
- assertEquals(0, context.getCamelContextExtension().getAsyncProcessorAwaitManager().getStatistics()
+ asyncProcessorAwaitManager.getStatistics().getThreadsBlocked());
+ assertEquals(0, asyncProcessorAwaitManager.getStatistics()
.getThreadsInterrupted());
}
@@ -63,12 +65,12 @@ public class AsyncProcessorAwaitManagerTest extends ContextTestSupport {
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
- int size = context.getCamelContextExtension().getAsyncProcessorAwaitManager().size();
+ int size = PluginHelper.getAsyncProcessorAwaitManager(context).size();
log.info("async inflight: {}", size);
assertEquals(1, size);
Collection<AsyncProcessorAwaitManager.AwaitThread> threads
- = context.getCamelContextExtension().getAsyncProcessorAwaitManager().browse();
+ = PluginHelper.getAsyncProcessorAwaitManager(context).browse();
AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next();
long wait = thread.getWaitDuration();
diff --git a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
index cfd445143f5..d1cc0adc905 100644
--- a/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
+++ b/core/camel-main/src/main/java/org/apache/camel/main/DefaultConfigurationConfigurer.java
@@ -383,7 +383,7 @@ public final class DefaultConfigurationConfigurer {
}
AsyncProcessorAwaitManager apam = getSingleBeanOfType(registry, AsyncProcessorAwaitManager.class);
if (apam != null) {
- ecc.getCamelContextExtension().setAsyncProcessorAwaitManager(apam);
+ ecc.getCamelContextExtension().addContextPlugin(AsyncProcessorAwaitManager.class, apam);
}
ManagementStrategy ms = getSingleBeanOfType(registry, ManagementStrategy.class);
if (ms != null) {
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java
index 6052d2f6dd4..19ae9d8f26c 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorHelper.java
@@ -43,7 +43,7 @@ public final class AsyncProcessorHelper {
*/
public static void process(final AsyncProcessor processor, final Exchange exchange) throws Exception {
final AsyncProcessorAwaitManager awaitManager
- = exchange.getContext().getCamelContextExtension().getAsyncProcessorAwaitManager();
+ = PluginHelper.getAsyncProcessorAwaitManager(exchange.getContext());
awaitManager.process(processor, exchange);
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java b/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java
index b693d3f1263..eba0c4ebd02 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/AsyncProcessorSupport.java
@@ -36,7 +36,7 @@ public abstract class AsyncProcessorSupport extends ServiceSupport implements As
@Override
public void process(Exchange exchange) throws Exception {
AsyncProcessorAwaitManager awaitManager
- = exchange.getContext().getCamelContextExtension().getAsyncProcessorAwaitManager();
+ = PluginHelper.getAsyncProcessorAwaitManager(exchange.getContext());
awaitManager.process(this, exchange);
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
index 06df2020b51..24372acc978 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultAsyncProducer.java
@@ -36,7 +36,7 @@ public abstract class DefaultAsyncProducer extends DefaultProducer implements As
@Override
public void process(Exchange exchange) throws Exception {
AsyncProcessorAwaitManager awaitManager
- = exchange.getContext().getCamelContextExtension().getAsyncProcessorAwaitManager();
+ = PluginHelper.getAsyncProcessorAwaitManager(exchange.getContext());
awaitManager.process(this, exchange);
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
index deebc08f9d1..4870a9e22f5 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java
@@ -23,6 +23,7 @@ import org.apache.camel.CamelContext;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.console.DevConsoleResolver;
import org.apache.camel.health.HealthCheckResolver;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.CamelBeanPostProcessor;
import org.apache.camel.spi.CamelDependencyInjectionAnnotationFactory;
import org.apache.camel.spi.ComponentNameResolver;
@@ -430,11 +431,28 @@ public final class PluginHelper {
return getRoutesLoader(camelContext.getCamelContextExtension());
}
-
/**
* Gets the {@link RoutesLoader} to be used.
*/
public static RoutesLoader getRoutesLoader(ExtendedCamelContext extendedCamelContext) {
return extendedCamelContext.getContextPlugin(RoutesLoader.class);
}
+
+ /**
+ * Gets the {@link org.apache.camel.AsyncProcessor} await manager.
+ *
+ * @return the manager
+ */
+ public static AsyncProcessorAwaitManager getAsyncProcessorAwaitManager(CamelContext camelContext) {
+ return getAsyncProcessorAwaitManager(camelContext.getCamelContextExtension());
+ }
+
+ /**
+ * Gets the {@link org.apache.camel.AsyncProcessor} await manager.
+ *
+ * @return the manager
+ */
+ public static AsyncProcessorAwaitManager getAsyncProcessorAwaitManager(ExtendedCamelContext extendedCamelContext) {
+ return extendedCamelContext.getContextPlugin(AsyncProcessorAwaitManager.class);
+ }
}