You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/12/21 08:51:02 UTC
[1/5] camel git commit: CAMEL-8165: Async routing engine - Add
insight into threads blocked waiting for callbacks
Repository: camel
Updated Branches:
refs/heads/master 5017a5a48 -> bf19896c0
CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/057fb60e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/057fb60e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/057fb60e
Branch: refs/heads/master
Commit: 057fb60e90be001f7a1bbeffdbda12d5047c76eb
Parents: 5017a5a
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Dec 19 16:34:10 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Dec 20 13:33:11 2014 +0100
----------------------------------------------------------------------
.../java/org/apache/camel/CamelContext.java | 15 ++
.../management/mbean/CamelOpenMBeanTypes.java | 10 ++
.../ManagedAsyncProcessorAwaitManagerMBean.java | 41 +++++
.../impl/DefaultAsyncProcessorAwaitManager.java | 169 +++++++++++++++++++
.../apache/camel/impl/DefaultCamelContext.java | 11 ++
.../DefaultManagementLifecycleStrategy.java | 4 +
.../ManagedAsyncProcessorAwaitManager.java | 105 ++++++++++++
.../camel/spi/AsyncProcessorAwaitManager.java | 110 ++++++++++++
.../apache/camel/util/AsyncProcessorHelper.java | 13 +-
.../impl/MultipleLifecycleStrategyTest.java | 2 +-
...roducerRouteAddRemoveRegisterAlwaysTest.java | 6 +-
.../management/ManagedRouteAddRemoveTest.java | 42 ++---
.../camel/processor/ThroughPutLoggerTest.java | 10 +-
13 files changed, 505 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/CamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java
index 9906047..90edf00 100644
--- a/camel-core/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java
@@ -30,6 +30,7 @@ import org.apache.camel.model.DataFormatDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.model.RoutesDefinition;
import org.apache.camel.model.rest.RestDefinition;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.CamelContextNameStrategy;
import org.apache.camel.spi.ClassResolver;
import org.apache.camel.spi.DataFormat;
@@ -1188,6 +1189,20 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
void setInflightRepository(InflightRepository repository);
/**
+ * Gets the {@link org.apache.camel.AsyncProcessor} await manager.
+ *
+ * @return the manager
+ */
+ AsyncProcessorAwaitManager getAsyncProcessorAwaitManager();
+
+ /**
+ * Sets a custom {@link org.apache.camel.AsyncProcessor} await manager.
+ *
+ * @param manager the manager
+ */
+ void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager manager);
+
+ /**
* Gets the the application context class loader which may be helpful for running camel in other containers
*
* @return the application context class loader
http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
index 5261b39..7c88dd9 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
@@ -88,5 +88,15 @@ public final class CamelOpenMBeanTypes {
new OpenType[]{SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING});
}
+ public static TabularType listAwaitThreadsTabularType() throws OpenDataException {
+ CompositeType ct = listAwaitThreadsCompositeType();
+ return new TabularType("listAwaitThreads", "Lists blocked threads by the routing engine", ct, new String[]{"name"});
+ }
+
+ public static CompositeType listAwaitThreadsCompositeType() throws OpenDataException {
+ return new CompositeType("threads", "Threads", new String[]{"name", "exchangeId", "duration"},
+ new String[]{"Thread name", "ExchangeId", "Duration"},
+ new OpenType[]{SimpleType.STRING, SimpleType.STRING, SimpleType.STRING});
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
new file mode 100644
index 0000000..bb5b669
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import javax.management.openmbean.TabularData;
+
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+
+public interface ManagedAsyncProcessorAwaitManagerMBean extends ManagedServiceMBean {
+
+ @ManagedAttribute(description = "Whether to interrupt any blocking threads during stopping.")
+ boolean isInterruptThreadsWhileStopping();
+
+ @ManagedAttribute(description = "Whether to interrupt any blocking threads during stopping.")
+ void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping);
+
+ @ManagedAttribute(description = "Number of threads that are blocked waiting for other threads to trigger the callback when they are done processing the exchange")
+ int getSize();
+
+ @ManagedOperation(description = "Lists all the exchanges which are currently inflight, having a blocked thread awaiting for other threads to trigger the callback when they are done")
+ TabularData browse();
+
+ @ManagedOperation(description = "To interrupt an exchange which may seem as stuck, to force the exchange to continue, allowing any blocking thread to be released.")
+ void interrupt(String exchangeId);
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
new file mode 100644
index 0000000..9d78260
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.support.ServiceSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements AsyncProcessorAwaitManager {
+
+ // TODO: capture message history of the exchange when it was interrupted
+ // TODO: capture route id, node id where thread is blocked
+ // TODO: rename to AsyncInflightRepository?
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncProcessorAwaitManager.class);
+
+ private final Map<Exchange, AwaitThread> inflight = new ConcurrentHashMap<Exchange, AwaitThread>();
+
+ private boolean interruptThreadsWhileStopping = true;
+
+ @Override
+ public void await(Exchange exchange, CountDownLatch latch) {
+ LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
+ exchange.getExchangeId(), exchange);
+ try {
+ inflight.put(exchange, new AwaitThreadEntry(Thread.currentThread(), exchange, latch));
+ latch.await();
+ LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}",
+ exchange.getExchangeId(), exchange);
+
+ } catch (InterruptedException e) {
+ LOG.trace("Interrupted while waiting for callback, will continue routing exchangeId: {} -> {}",
+ exchange.getExchangeId(), exchange);
+ exchange.setException(e);
+ } finally {
+ inflight.remove(exchange);
+ }
+ }
+
+ @Override
+ public void countDown(Exchange exchange, CountDownLatch latch) {
+ LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId());
+ latch.countDown();
+ }
+
+ @Override
+ public int size() {
+ return inflight.size();
+ }
+
+ @Override
+ public Collection<AwaitThread> browse() {
+ return Collections.unmodifiableCollection(inflight.values());
+ }
+
+ @Override
+ public void interrupt(Exchange exchange) {
+ AwaitThreadEntry latch = (AwaitThreadEntry) inflight.get(exchange);
+ if (latch != null) {
+ LOG.warn("Interrupted while waiting for asynchronous callback, will continue routing exchangeId: {} -> {}",
+ exchange.getExchangeId(), exchange);
+ exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback"));
+ latch.getLatch().countDown();
+ }
+ }
+
+ public boolean isInterruptThreadsWhileStopping() {
+ return interruptThreadsWhileStopping;
+ }
+
+ public void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping) {
+ this.interruptThreadsWhileStopping = interruptThreadsWhileStopping;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ // noop
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ Collection<AwaitThread> threads = browse();
+ int count = threads.size();
+ if (count > 0) {
+ LOG.warn("Shutting down while there are still " + count + " inflight threads currently blocked.");
+
+ StringBuilder sb = new StringBuilder();
+ for (AwaitThread entry : threads) {
+ sb.append("\tBlocked thread: ").append(entry.getBlockedThread().getName())
+ .append(", exchangeId=").append(entry.getExchange().getExchangeId())
+ .append(", duration=").append(entry.getWaitDuration()).append(" msec.");
+ }
+
+ if (isInterruptThreadsWhileStopping()) {
+ LOG.warn("The following threads are blocked and will be interrupted so the threads are released:\n" + sb.toString());
+ for (AwaitThread entry : threads) {
+ try {
+ interrupt(entry.getExchange());
+ } catch (Throwable e) {
+ LOG.warn("Error while interrupting thread: " + entry.getBlockedThread().getName() + ". This exception is ignored.", e);
+ }
+ }
+ } else {
+ LOG.warn("The following threads are blocked, and may reside in the JVM:\n" + sb.toString());
+ }
+ } else {
+ LOG.debug("Shutting down with no inflight threads.");
+ }
+
+ inflight.clear();
+ }
+
+ private static final class AwaitThreadEntry implements AwaitThread {
+ private final Thread thread;
+ private final Exchange exchange;
+ private final CountDownLatch latch;
+ private final long start;
+
+ private AwaitThreadEntry(Thread thread, Exchange exchange, CountDownLatch latch) {
+ this.thread = thread;
+ this.exchange = exchange;
+ this.latch = latch;
+ this.start = System.currentTimeMillis();
+ }
+
+ @Override
+ public Thread getBlockedThread() {
+ return thread;
+ }
+
+ @Override
+ public Exchange getExchange() {
+ return exchange;
+ }
+
+ @Override
+ public long getWaitDuration() {
+ return System.currentTimeMillis() - start;
+ }
+
+ public CountDownLatch getLatch() {
+ return latch;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 7eb7fe6..7e526f5 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -94,6 +94,7 @@ import org.apache.camel.processor.interceptor.Delayer;
import org.apache.camel.processor.interceptor.HandleFault;
import org.apache.camel.processor.interceptor.StreamCaching;
import org.apache.camel.processor.interceptor.Tracer;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.CamelContextNameStrategy;
import org.apache.camel.spi.ClassResolver;
import org.apache.camel.spi.ComponentResolver;
@@ -231,6 +232,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
private InterceptStrategy defaultBacklogTracer;
private InterceptStrategy defaultBacklogDebugger;
private InflightRepository inflightRepository = new DefaultInflightRepository();
+ private AsyncProcessorAwaitManager asyncProcessorAwaitManager = new DefaultAsyncProcessorAwaitManager();
private RuntimeEndpointRegistry runtimeEndpointRegistry = new DefaultRuntimeEndpointRegistry();
private final List<RouteStartupOrder> routeStartupOrder = new ArrayList<RouteStartupOrder>();
// start auto assigning route ids using numbering 1000 and upwards
@@ -2082,6 +2084,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
doAddService(executorServiceManager, false);
addService(producerServicePool);
addService(inflightRepository);
+ addService(asyncProcessorAwaitManager);
addService(shutdownStrategy);
addService(packageScanClassResolver);
addService(restRegistry);
@@ -2972,6 +2975,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
this.inflightRepository = repository;
}
+ public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() {
+ return asyncProcessorAwaitManager;
+ }
+
+ public void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager asyncProcessorAwaitManager) {
+ this.asyncProcessorAwaitManager = asyncProcessorAwaitManager;
+ }
+
public void setAutoStartup(Boolean autoStartup) {
this.autoStartup = autoStartup;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
index 3e44b60..6291545 100644
--- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
@@ -51,6 +51,7 @@ import org.apache.camel.impl.EndpointRegistry;
import org.apache.camel.impl.EventDrivenConsumerRoute;
import org.apache.camel.impl.ProducerCache;
import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager;
import org.apache.camel.management.mbean.ManagedBacklogDebugger;
import org.apache.camel.management.mbean.ManagedBacklogTracer;
import org.apache.camel.management.mbean.ManagedCamelContext;
@@ -78,6 +79,7 @@ import org.apache.camel.processor.CamelInternalProcessor;
import org.apache.camel.processor.interceptor.BacklogDebugger;
import org.apache.camel.processor.interceptor.BacklogTracer;
import org.apache.camel.processor.interceptor.Tracer;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.EventNotifier;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.ManagementAgent;
@@ -466,6 +468,8 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service);
} else if (service instanceof RestRegistry) {
answer = new ManagedRestRegistry(context, (RestRegistry) service);
+ } else if (service instanceof AsyncProcessorAwaitManager) {
+ answer = new ManagedAsyncProcessorAwaitManager(context, (AsyncProcessorAwaitManager) service);
} else if (service instanceof RuntimeEndpointRegistry) {
answer = new ManagedRuntimeEndpointRegistry(context, (RuntimeEndpointRegistry) service);
} else if (service instanceof StreamCachingStrategy) {
http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
new file mode 100644
index 0000000..bd32b4e
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import java.util.Collection;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes;
+import org.apache.camel.api.management.mbean.ManagedAsyncProcessorAwaitManagerMBean;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ *
+ */
+@ManagedResource(description = "Managed AsyncProcessorAwaitManager")
+public class ManagedAsyncProcessorAwaitManager extends ManagedService implements ManagedAsyncProcessorAwaitManagerMBean {
+
+ private final AsyncProcessorAwaitManager manager;
+
+ public ManagedAsyncProcessorAwaitManager(CamelContext context, AsyncProcessorAwaitManager manager) {
+ super(context, manager);
+ this.manager = manager;
+ }
+
+ public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() {
+ return manager;
+ }
+
+ @Override
+ public boolean isInterruptThreadsWhileStopping() {
+ return manager.isInterruptThreadsWhileStopping();
+ }
+
+ @Override
+ public void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping) {
+ manager.setInterruptThreadsWhileStopping(interruptThreadsWhileStopping);
+ }
+
+ @Override
+ public int getSize() {
+ return manager.size();
+ }
+
+ @Override
+ public TabularData browse() {
+ try {
+ TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.listAwaitThreadsTabularType());
+ Collection<AsyncProcessorAwaitManager.AwaitThread> threads = manager.browse();
+ for (AsyncProcessorAwaitManager.AwaitThread entry : threads) {
+ CompositeType ct = CamelOpenMBeanTypes.listAwaitThreadsCompositeType();
+ String name = entry.getBlockedThread().getName();
+ String exchangeId = entry.getExchange().getExchangeId();
+ String duration = "" + entry.getWaitDuration();
+
+ CompositeData data = new CompositeDataSupport(ct, new String[]
+ {"name", "exchangeId", "duration"},
+ new Object[]{name, exchangeId, duration});
+ answer.put(data);
+ }
+ return answer;
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+
+ @Override
+ public void interrupt(String exchangeId) {
+ // need to find the exchange with the given exchange id
+ Exchange found = null;
+ for (AsyncProcessorAwaitManager.AwaitThread entry : manager.browse()) {
+ Exchange exchange = entry.getExchange();
+ if (exchangeId.equals(exchange.getExchangeId())) {
+ found = exchange;
+ break;
+ }
+ }
+
+ if (found != null) {
+ manager.interrupt(found);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
new file mode 100644
index 0000000..7780e3f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.StaticService;
+
+/**
+ * A manager to handle async routing engine, when {@link Exchange}s are being handed over from one thread to another, while
+ * the callee thread is blocked waiting for the other threads to complete, before it can continue.
+ * <p/>
+ * This manager offers insight into the state, and allow to force stuck exchanges to be continued and threads to continue,
+ * in case of malfunctions.
+ */
+public interface AsyncProcessorAwaitManager extends StaticService {
+
+ /**
+ * Information about the thread and exchange that are inflight.
+ */
+ interface AwaitThread {
+
+ /**
+ * The thread which is blocked waiting for other threads to signal the callback.
+ */
+ Thread getBlockedThread();
+
+ /**
+ * The exchange being processed by the other thread.
+ */
+ Exchange getExchange();
+
+ /**
+ * Time in millis the thread has been blocked waiting for the signal.
+ */
+ long getWaitDuration();
+ }
+
+ /**
+ * Registers the exchange to await for the callback to be triggered by another thread which has taken over processing
+ * this exchange. The current thread will await until that callback happens in the future (blocking until this happens).
+ *
+ * @param exchange the exchange
+ * @param latch the latch used to wait for other thread to signal when its done
+ */
+ void await(Exchange exchange, CountDownLatch latch);
+
+ /**
+ * Triggered when the other thread is done processing the exchange, to signal to the waiting thread is done, and can take
+ * over control to further process the exchange.
+ *
+ * @param exchange the exchange
+ * @param latch the latch used to wait for other thread to signal when its done
+ */
+ void countDown(Exchange exchange, CountDownLatch latch);
+
+ /**
+ * Number of threads that are blocked waiting for other threads to trigger the callback when they are done processing
+ * the exchange.
+ */
+ int size();
+
+ /**
+ * A <i>read-only</i> browser of the {@link AwaitThread}s that are currently inflight.
+ */
+ Collection<AwaitThread> browse();
+
+ /**
+ * To interrupt an exchange which may seem as stuck, to force the exchange to continue,
+ * allowing any blocking thread to be released.
+ * <p/>
+ * <b>Important:</b> Use this with caution as the other thread is still assumed to be process the exchange. Though
+ * if it appears as the exchange is <i>stuck</i>, then this method can remedy this, by forcing the latch to count-down
+ * so the blocked thread can continue. An exception is set on the exchange which allows Camel's error handler to deal
+ * with this malfunctioned exchange.
+ *
+ * @param exchange the exchange to interrupt.
+ */
+ void interrupt(Exchange exchange);
+
+ /**
+ * Whether to interrupt any blocking threads during stopping.
+ * <p/>
+ * This is enabled by default which allows Camel to release any blocked thread during shutting down Camel itself.
+ */
+ boolean isInterruptThreadsWhileStopping();
+
+ /**
+ * Sets whether to interrupt any blocking threads during stopping.
+ * <p/>
+ * This is enabled by default which allows Camel to release any blocked thread during shutting down Camel itself.
+ */
+ void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping);
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
index 2115d3a..cfdfbcc 100644
--- a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
@@ -17,9 +17,11 @@
package org.apache.camel.util;
import java.util.concurrent.CountDownLatch;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.UnitOfWork;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,12 +103,13 @@ public final class AsyncProcessorHelper {
* @throws Exception can be thrown if waiting is interrupted
*/
public static void process(final AsyncProcessor processor, final Exchange exchange) throws Exception {
+ final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
+
final CountDownLatch latch = new CountDownLatch(1);
boolean sync = processor.process(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
if (!doneSync) {
- LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId());
- latch.countDown();
+ awaitManager.countDown(exchange, latch);
}
}
@@ -116,11 +119,7 @@ public final class AsyncProcessorHelper {
}
});
if (!sync) {
- LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
- exchange.getExchangeId(), exchange);
- latch.await();
- LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}",
- exchange.getExchangeId(), exchange);
+ awaitManager.await(exchange, latch);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java b/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
index 0907c1f..a3cf751 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
@@ -51,7 +51,7 @@ public class MultipleLifecycleStrategyTest extends TestSupport {
context.stop();
List<String> expectedEvents = Arrays.asList("onContextStart", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd",
- "onServiceAdd", "onServiceAdd", "onServiceAdd", "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop");
+ "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop");
assertEquals(expectedEvents, dummy1.getEvents());
assertEquals(expectedEvents, dummy2.getEvents());
http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
index a8201e3..86ec502 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
@@ -47,7 +47,7 @@ public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementT
// number of services
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// number of producers
ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -72,7 +72,7 @@ public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementT
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// but as its recipient list which is dynamic-to we add new producers because we have register always
namesP = mbeanServer.queryNames(onP, null);
@@ -87,7 +87,7 @@ public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementT
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// and we still have the other producers, but not the one from the 2nd route that was removed
namesP = mbeanServer.queryNames(onP, null);
http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
index 4ba5c04..46656de 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
@@ -59,7 +59,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// number of services
ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,*");
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// number of producers
ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -84,7 +84,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// but we should have one more producer
namesP = mbeanServer.queryNames(onP, null);
@@ -99,7 +99,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// and the 2nd producer should be removed
namesP = mbeanServer.queryNames(onP, null);
@@ -119,7 +119,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// number of services
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// number of producers
ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -144,7 +144,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// but as its recipient list which is dynamic-to we do not add a new producer
namesP = mbeanServer.queryNames(onP, null);
@@ -159,7 +159,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// and we still have the original producer
namesP = mbeanServer.queryNames(onP, null);
@@ -179,7 +179,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// number of services
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// number of producers
ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -204,7 +204,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// but as its recipient list which is dynamic-to we do not add a new producer
namesP = mbeanServer.queryNames(onP, null);
@@ -219,7 +219,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// and we still have the original producer
namesP = mbeanServer.queryNames(onP, null);
@@ -239,7 +239,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// number of services
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
log.info("Adding 2nd route");
@@ -269,7 +269,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// now stop and remove the 2nd route
log.info("Stopping 2nd route");
@@ -281,7 +281,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
log.info("Shutting down...");
}
@@ -297,7 +297,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// number of services
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
log.info("Adding 2nd route");
@@ -328,7 +328,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// now stop and remove the 2nd route
log.info("Stopping 2nd route");
@@ -340,7 +340,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
log.info("Shutting down...");
}
@@ -356,7 +356,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// number of services
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
log.info("Adding 2nd route");
@@ -385,7 +385,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// now stop and remove the 2nd route
log.info("Stopping 2nd route");
@@ -397,7 +397,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
log.info("Shutting down...");
}
@@ -413,7 +413,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// number of services
Set<ObjectName> names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
log.info("Adding 2nd route");
@@ -443,7 +443,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
// now stop and remove the 2nd route
log.info("Stopping 2nd route");
@@ -455,7 +455,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
// there should still be the same number of services
names = mbeanServer.queryNames(on, null);
- assertEquals(8, names.size());
+ assertEquals(9, names.size());
log.info("Shutting down...");
}
http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/test/java/org/apache/camel/processor/ThroughPutLoggerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThroughPutLoggerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThroughPutLoggerTest.java
index 9a3601d..8b6f667 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThroughPutLoggerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThroughPutLoggerTest.java
@@ -17,6 +17,9 @@
package org.apache.camel.processor;
import junit.framework.TestCase;
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.util.CamelLogger;
import org.easymock.EasyMock;
import org.slf4j.Logger;
@@ -25,6 +28,9 @@ import org.slf4j.Marker;
public class ThroughPutLoggerTest extends TestCase {
public void testLogStringDurationIsNotZero() throws Exception {
+ CamelContext camel = new DefaultCamelContext();
+ camel.start();
+
Logger logger = EasyMock.createMock(Logger.class);
logger.isInfoEnabled();
EasyMock.expectLastCall().andReturn(true).atLeastOnce();
@@ -36,8 +42,10 @@ public class ThroughPutLoggerTest extends TestCase {
ThroughputLogger underTest = new ThroughputLogger(new CamelLogger(logger));
underTest.setGroupSize(10);
for (int i = 0; i < 25; i++) {
- underTest.process(null);
+ underTest.process(new DefaultExchange(camel));
}
EasyMock.verify(logger);
+
+ camel.stop();
}
}
[4/5] camel git commit: CAMEL-8165: Async routing engine - Add
insight into threads blocked waiting for callbacks
Posted by da...@apache.org.
CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cd6cd485
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cd6cd485
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cd6cd485
Branch: refs/heads/master
Commit: cd6cd48551cdbffb59c12fd0903749397583c623
Parents: 9e010df
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Dec 20 18:21:09 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Dec 20 18:21:09 2014 +0100
----------------------------------------------------------------------
.../processor/async/AsyncProcessorAwaitManagerInterruptTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/cd6cd485/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
index d78f7f1..2d97c80 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
@@ -60,7 +60,7 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport
from("direct:start").routeId("myRoute")
.to("mock:before")
- .to("async:bye:camel").id("myAsync")
+ .to("async:bye:camel?delay=500").id("myAsync")
.to("mock:after")
.process(new Processor() {
@Override
[2/5] camel git commit: CAMEL-8165: Async routing engine - Add
insight into threads blocked waiting for callbacks
Posted by da...@apache.org.
CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/36c2d70d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/36c2d70d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/36c2d70d
Branch: refs/heads/master
Commit: 36c2d70d5f952e4b8eaa9cf9ed0b3af26e5f66ef
Parents: 057fb60
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Dec 20 13:55:07 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Dec 20 14:35:09 2014 +0100
----------------------------------------------------------------------
.../management/mbean/CamelOpenMBeanTypes.java | 8 +-
.../impl/DefaultAsyncProcessorAwaitManager.java | 120 ++++++++++++++++---
.../ManagedAsyncProcessorAwaitManager.java | 24 ++--
.../camel/spi/AsyncProcessorAwaitManager.java | 28 +++++
...AsyncProcessorAwaitManagerInterruptTest.java | 86 +++++++++++++
.../async/AsyncProcessorAwaitManagerTest.java | 80 +++++++++++++
6 files changed, 311 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/36c2d70d/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
index 7c88dd9..070f5c8 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
@@ -90,13 +90,13 @@ public final class CamelOpenMBeanTypes {
public static TabularType listAwaitThreadsTabularType() throws OpenDataException {
CompositeType ct = listAwaitThreadsCompositeType();
- return new TabularType("listAwaitThreads", "Lists blocked threads by the routing engine", ct, new String[]{"name"});
+ return new TabularType("listAwaitThreads", "Lists blocked threads by the routing engine", ct, new String[]{"id"});
}
public static CompositeType listAwaitThreadsCompositeType() throws OpenDataException {
- return new CompositeType("threads", "Threads", new String[]{"name", "exchangeId", "duration"},
- new String[]{"Thread name", "ExchangeId", "Duration"},
- new OpenType[]{SimpleType.STRING, SimpleType.STRING, SimpleType.STRING});
+ return new CompositeType("threads", "Threads", new String[]{"id", "name", "exchangeId", "routeId", "nodeId", "duration"},
+ new String[]{"Thread Id", "Thread name", "ExchangeId", "RouteId", "NodeId", "Duration"},
+ new OpenType[]{SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING});
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/36c2d70d/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
index 9d78260..20c2927 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
@@ -18,29 +18,41 @@ package org.apache.camel.impl;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import org.apache.camel.Exchange;
+import org.apache.camel.MessageHistory;
+import org.apache.camel.processor.DefaultExchangeFormatter;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.spi.ExchangeFormatter;
import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.MessageHelper;
+import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements AsyncProcessorAwaitManager {
- // TODO: capture message history of the exchange when it was interrupted
- // TODO: capture route id, node id where thread is blocked
- // TODO: rename to AsyncInflightRepository?
-
private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncProcessorAwaitManager.class);
private final Map<Exchange, AwaitThread> inflight = new ConcurrentHashMap<Exchange, AwaitThread>();
-
+ private final ExchangeFormatter exchangeFormatter;
private boolean interruptThreadsWhileStopping = true;
+ public DefaultAsyncProcessorAwaitManager() {
+ // setup exchange formatter to be used for message history dump
+ DefaultExchangeFormatter formatter = new DefaultExchangeFormatter();
+ formatter.setShowExchangeId(true);
+ formatter.setMultiline(true);
+ formatter.setShowHeaders(true);
+ formatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed);
+ this.exchangeFormatter = formatter;
+ }
+
@Override
public void await(Exchange exchange, CountDownLatch latch) {
LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
@@ -77,13 +89,47 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
}
@Override
+ public void interrupt(String exchangeId) {
+ // need to find the exchange with the given exchange id
+ Exchange found = null;
+ for (AsyncProcessorAwaitManager.AwaitThread entry : browse()) {
+ Exchange exchange = entry.getExchange();
+ if (exchangeId.equals(exchange.getExchangeId())) {
+ found = exchange;
+ break;
+ }
+ }
+
+ if (found != null) {
+ interrupt(found);
+ }
+ }
+
+ @Override
public void interrupt(Exchange exchange) {
- AwaitThreadEntry latch = (AwaitThreadEntry) inflight.get(exchange);
- if (latch != null) {
- LOG.warn("Interrupted while waiting for asynchronous callback, will continue routing exchangeId: {} -> {}",
- exchange.getExchangeId(), exchange);
- exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback"));
- latch.getLatch().countDown();
+ AwaitThreadEntry entry = (AwaitThreadEntry) inflight.get(exchange);
+ if (entry != null) {
+ try {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Interrupted while waiting for asynchronous callback, will release the following blocked thread which was waiting for exchange to finish processing with exchangeId: ");
+ sb.append(exchange.getExchangeId());
+ sb.append("\n");
+
+ sb.append(dumpBlockedThread(entry));
+
+ // dump a route stack trace of the exchange
+ String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, exchangeFormatter, false);
+ if (routeStackTrace != null) {
+ sb.append(routeStackTrace);
+ }
+ LOG.warn(sb.toString());
+
+ } catch (Exception e) {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ } finally {
+ exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
+ entry.getLatch().countDown();
+ }
}
}
@@ -109,9 +155,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
StringBuilder sb = new StringBuilder();
for (AwaitThread entry : threads) {
- sb.append("\tBlocked thread: ").append(entry.getBlockedThread().getName())
- .append(", exchangeId=").append(entry.getExchange().getExchangeId())
- .append(", duration=").append(entry.getWaitDuration()).append(" msec.");
+ sb.append(dumpBlockedThread(entry));
}
if (isInterruptThreadsWhileStopping()) {
@@ -133,17 +177,50 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
inflight.clear();
}
+ private static String dumpBlockedThread(AwaitThread entry) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n");
+ sb.append("Blocked Thread\n");
+ sb.append("---------------------------------------------------------------------------------------------------------------------------------------\n");
+
+ sb.append(style("Id:")).append(entry.getBlockedThread().getId()).append("\n");
+ sb.append(style("Name:")).append(entry.getBlockedThread().getName()).append("\n");
+ sb.append(style("RouteId:")).append(safeNull(entry.getRouteId())).append("\n");
+ sb.append(style("NodeId:")).append(safeNull(entry.getNodeId())).append("\n");
+ sb.append(style("Duration:")).append(entry.getWaitDuration()).append(" msec.\n");
+ return sb.toString();
+ }
+
+ private static String style(String label) {
+ return String.format("\t%-20s", label);
+ }
+
+ private static String safeNull(Object value) {
+ return value != null ? value.toString() : "";
+ }
+
private static final class AwaitThreadEntry implements AwaitThread {
private final Thread thread;
private final Exchange exchange;
private final CountDownLatch latch;
private final long start;
+ private String routeId;
+ private String nodeId;
private AwaitThreadEntry(Thread thread, Exchange exchange, CountDownLatch latch) {
this.thread = thread;
this.exchange = exchange;
this.latch = latch;
this.start = System.currentTimeMillis();
+
+ // capture details from message history if enabled
+ List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
+ if (list != null && !list.isEmpty()) {
+ // grab last part
+ MessageHistory history = list.get(list.size() - 1);
+ routeId = history.getRouteId();
+ nodeId = history.getNode() != null ? history.getNode().getId() : null;
+ }
}
@Override
@@ -161,9 +238,24 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
return System.currentTimeMillis() - start;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public String getNodeId() {
+ return nodeId;
+ }
+
public CountDownLatch getLatch() {
return latch;
}
+
+ @Override
+ public String toString() {
+ return "AwaitThreadEntry[name=" + thread.getName() + ", exchangeId=" + exchange.getExchangeId() + "]";
+ }
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/36c2d70d/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
index bd32b4e..a4759ef 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
@@ -24,7 +24,6 @@ import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes;
import org.apache.camel.api.management.mbean.ManagedAsyncProcessorAwaitManagerMBean;
@@ -70,13 +69,16 @@ public class ManagedAsyncProcessorAwaitManager extends ManagedService implements
Collection<AsyncProcessorAwaitManager.AwaitThread> threads = manager.browse();
for (AsyncProcessorAwaitManager.AwaitThread entry : threads) {
CompositeType ct = CamelOpenMBeanTypes.listAwaitThreadsCompositeType();
+ String id = "" + entry.getBlockedThread().getId();
String name = entry.getBlockedThread().getName();
String exchangeId = entry.getExchange().getExchangeId();
+ String routeId = entry.getRouteId();
+ String nodeId = entry.getNodeId();
String duration = "" + entry.getWaitDuration();
- CompositeData data = new CompositeDataSupport(ct, new String[]
- {"name", "exchangeId", "duration"},
- new Object[]{name, exchangeId, duration});
+ CompositeData data = new CompositeDataSupport(ct,
+ new String[]{"id", "name", "exchangeId", "routeId", "nodeId", "duration"},
+ new Object[]{id, name, exchangeId, routeId, nodeId, duration});
answer.put(data);
}
return answer;
@@ -87,19 +89,7 @@ public class ManagedAsyncProcessorAwaitManager extends ManagedService implements
@Override
public void interrupt(String exchangeId) {
- // need to find the exchange with the given exchange id
- Exchange found = null;
- for (AsyncProcessorAwaitManager.AwaitThread entry : manager.browse()) {
- Exchange exchange = entry.getExchange();
- if (exchangeId.equals(exchange.getExchangeId())) {
- found = exchange;
- break;
- }
- }
-
- if (found != null) {
- manager.interrupt(found);
- }
+ manager.interrupt(exchangeId);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/36c2d70d/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
index 7780e3f..0b39c1c 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
@@ -50,6 +50,21 @@ public interface AsyncProcessorAwaitManager extends StaticService {
* Time in millis the thread has been blocked waiting for the signal.
*/
long getWaitDuration();
+
+ /**
+ * The id of the route where the exchange was processed when the thread was set to block.
+ * <p/>
+ * Is <tt>null</tt> if message history is disabled.
+ */
+ String getRouteId();
+
+ /**
+ * The id of the node from the route where the exchange was processed when the thread was set to block.
+ * <p/>
+ * Is <tt>null</tt> if message history is disabled.
+ */
+ String getNodeId();
+
}
/**
@@ -90,6 +105,19 @@ public interface AsyncProcessorAwaitManager extends StaticService {
* so the blocked thread can continue. An exception is set on the exchange which allows Camel's error handler to deal
* with this malfunctioned exchange.
*
+ * @param exchangeId the exchange id to interrupt.
+ */
+ void interrupt(String exchangeId);
+
+ /**
+ * To interrupt an exchange which may seem as stuck, to force the exchange to continue,
+ * allowing any blocking thread to be released.
+ * <p/>
+ * <b>Important:</b> Use this with caution as the other thread is still assumed to be process the exchange. Though
+ * if it appears as the exchange is <i>stuck</i>, then this method can remedy this, by forcing the latch to count-down
+ * so the blocked thread can continue. An exception is set on the exchange which allows Camel's error handler to deal
+ * with this malfunctioned exchange.
+ *
* @param exchange the exchange to interrupt.
*/
void interrupt(Exchange exchange);
http://git-wip-us.apache.org/repos/asf/camel/blob/36c2d70d/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
new file mode 100644
index 0000000..d78f7f1
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import java.util.Collection;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
+
+/**
+ * @version
+ */
+public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport {
+
+ public void testAsyncAwaitInterrupt() throws Exception {
+ assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+
+ try {
+ template.requestBody("direct:start", "Hello Camel", String.class);
+ fail("Should have thrown exception");
+ } catch (CamelExecutionException e) {
+ RejectedExecutionException cause = assertIsInstanceOf(RejectedExecutionException.class, e.getCause());
+ assertTrue(cause.getMessage().startsWith("Interrupted while waiting for asynchronous callback"));
+ }
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("direct:start").routeId("myRoute")
+ .to("mock:before")
+ .to("async:bye:camel").id("myAsync")
+ .to("mock:after")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ int size = context.getAsyncProcessorAwaitManager().size();
+ log.info("async inflight: {}", size);
+ assertEquals(1, size);
+
+ Collection<AsyncProcessorAwaitManager.AwaitThread> threads = context.getAsyncProcessorAwaitManager().browse();
+ AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next();
+
+ // lets interrupt it
+ String id = thread.getExchange().getExchangeId();
+ context.getAsyncProcessorAwaitManager().interrupt(id);
+ }
+ })
+ .transform(constant("Hi Camel"))
+ .to("mock:result");
+ }
+ };
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/36c2d70d/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
new file mode 100644
index 0000000..8389069
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import java.util.Collection;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
+
+/**
+ * @version
+ */
+public class AsyncProcessorAwaitManagerTest extends ContextTestSupport {
+
+ public void testAsyncAwait() throws Exception {
+ assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+ String reply = template.requestBody("direct:start", "Hello Camel", String.class);
+ assertEquals("Bye Camel", reply);
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("direct:start").routeId("myRoute")
+ .to("mock:before")
+ .to("async:bye:camel").id("myAsync")
+ .to("mock:after")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ int size = context.getAsyncProcessorAwaitManager().size();
+ log.info("async inflight: {}", size);
+ assertEquals(1, size);
+
+ Collection<AsyncProcessorAwaitManager.AwaitThread> threads = context.getAsyncProcessorAwaitManager().browse();
+ AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next();
+
+ long wait = thread.getWaitDuration();
+ log.info("Thread {} has waited for {} msec.", thread.getBlockedThread().getName(), wait);
+
+ assertEquals("myRoute", thread.getRouteId());
+ assertEquals("myAsync", thread.getNodeId());
+ }
+ })
+ .to("mock:result");
+ }
+ };
+ }
+
+}
\ No newline at end of file
[5/5] camel git commit: CAMEL-8165: Async routing engine - Add
insight into threads blocked waiting for callbacks
Posted by da...@apache.org.
CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bf19896c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bf19896c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bf19896c
Branch: refs/heads/master
Commit: bf19896c0cfce610d704b5f6eb626875afe5af76
Parents: cd6cd48
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Dec 21 08:14:08 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Dec 21 08:50:45 2014 +0100
----------------------------------------------------------------------
.../main/java/org/apache/camel/impl/DefaultCamelContext.java | 3 +++
.../java/org/apache/camel/spi/AsyncProcessorAwaitManager.java | 5 +++--
.../apache/camel/core/xml/AbstractCamelContextFactoryBean.java | 6 ++++++
3 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/bf19896c/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 7e526f5..eb784b6 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -2167,6 +2167,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
}
getRouteStartupOrder().clear();
+ // shutdown await manager to trigger interrupt of blocked threads to attempt to free these threads graceful
+ shutdownServices(asyncProcessorAwaitManager);
+
shutdownServices(routeServices.values());
// do not clear route services or startup listeners as we can start Camel again and get the route back as before
http://git-wip-us.apache.org/repos/asf/camel/blob/bf19896c/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
index 0b39c1c..3081449 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
@@ -26,8 +26,9 @@ import org.apache.camel.StaticService;
* A manager to handle async routing engine, when {@link Exchange}s are being handed over from one thread to another, while
* the callee thread is blocked waiting for the other threads to complete, before it can continue.
* <p/>
- * This manager offers insight into the state, and allow to force stuck exchanges to be continued and threads to continue,
- * in case of malfunctions.
+ * This manager offers insight into the state, and allow to force stuck exchanges to be continued and for blocked threads
+ * to be unblocked, which may happen in case of severe malfunctions (such as the system runs out of memory, a 3rd party
+ * never responding, or a timeout not triggering, etc).
*/
public interface AsyncProcessorAwaitManager extends StaticService {
http://git-wip-us.apache.org/repos/asf/camel/blob/bf19896c/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
----------------------------------------------------------------------
diff --git a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
index 32b84a8..8aca563 100644
--- a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
+++ b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
@@ -69,6 +69,7 @@ import org.apache.camel.processor.interceptor.BacklogTracer;
import org.apache.camel.processor.interceptor.HandleFault;
import org.apache.camel.processor.interceptor.TraceFormatter;
import org.apache.camel.processor.interceptor.Tracer;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.ClassResolver;
import org.apache.camel.spi.Debugger;
import org.apache.camel.spi.EndpointStrategy;
@@ -204,6 +205,11 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
LOG.info("Using custom InflightRepository: {}", inflightRepository);
getContext().setInflightRepository(inflightRepository);
}
+ AsyncProcessorAwaitManager asyncProcessorAwaitManager = getBeanForType(AsyncProcessorAwaitManager.class);
+ if (asyncProcessorAwaitManager != null) {
+ LOG.info("Using custom AsyncProcessorAwaitManager: {}", asyncProcessorAwaitManager);
+ getContext().setAsyncProcessorAwaitManager(asyncProcessorAwaitManager);
+ }
ManagementStrategy managementStrategy = getBeanForType(ManagementStrategy.class);
if (managementStrategy != null) {
LOG.info("Using custom ManagementStrategy: {}", managementStrategy);
[3/5] camel git commit: Fixed CS
Posted by da...@apache.org.
Fixed CS
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9e010df0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9e010df0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9e010df0
Branch: refs/heads/master
Commit: 9e010df0fb7a5be9f41c8d1440dbeac4a0c65c6b
Parents: 36c2d70
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Dec 20 14:36:00 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Dec 20 14:36:00 2014 +0100
----------------------------------------------------------------------
.../src/main/java/org/apache/camel/component/bean/MethodInfo.java | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/9e010df0/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
index aaf3cf0..61f0882 100644
--- a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
+++ b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
@@ -478,20 +478,17 @@ public class MethodInfo {
// evaluate the parameter value binding
value = evaluateParameterValue(exchange, i, parameterValue, parameterType);
}
-
// use bean parameter binding, if still no value
Expression expression = expressions[i];
if (value == null && expression != null) {
value = evaluateParameterBinding(exchange, expression, i, parameterType);
}
}
-
// remember the value to use
if (value != Void.TYPE) {
answer[i] = value;
}
}
-
return (T) answer;
}