You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/12/21 08:51:02 UTC

[1/5] camel git commit: CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks

Repository: camel
Updated Branches:
  refs/heads/master 5017a5a48 -> bf19896c0


CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/057fb60e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/057fb60e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/057fb60e

Branch: refs/heads/master
Commit: 057fb60e90be001f7a1bbeffdbda12d5047c76eb
Parents: 5017a5a
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Dec 19 16:34:10 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Dec 20 13:33:11 2014 +0100

----------------------------------------------------------------------
 .../java/org/apache/camel/CamelContext.java     |  15 ++
 .../management/mbean/CamelOpenMBeanTypes.java   |  10 ++
 .../ManagedAsyncProcessorAwaitManagerMBean.java |  41 +++++
 .../impl/DefaultAsyncProcessorAwaitManager.java | 169 +++++++++++++++++++
 .../apache/camel/impl/DefaultCamelContext.java  |  11 ++
 .../DefaultManagementLifecycleStrategy.java     |   4 +
 .../ManagedAsyncProcessorAwaitManager.java      | 105 ++++++++++++
 .../camel/spi/AsyncProcessorAwaitManager.java   | 110 ++++++++++++
 .../apache/camel/util/AsyncProcessorHelper.java |  13 +-
 .../impl/MultipleLifecycleStrategyTest.java     |   2 +-
 ...roducerRouteAddRemoveRegisterAlwaysTest.java |   6 +-
 .../management/ManagedRouteAddRemoveTest.java   |  42 ++---
 .../camel/processor/ThroughPutLoggerTest.java   |  10 +-
 13 files changed, 505 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/CamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java
index 9906047..90edf00 100644
--- a/camel-core/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java
@@ -30,6 +30,7 @@ import org.apache.camel.model.DataFormatDefinition;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.RoutesDefinition;
 import org.apache.camel.model.rest.RestDefinition;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.CamelContextNameStrategy;
 import org.apache.camel.spi.ClassResolver;
 import org.apache.camel.spi.DataFormat;
@@ -1188,6 +1189,20 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
     void setInflightRepository(InflightRepository repository);
 
     /**
+     * Gets the {@link org.apache.camel.AsyncProcessor} await manager.
+     *
+     * @return the manager
+     */
+    AsyncProcessorAwaitManager getAsyncProcessorAwaitManager();
+
+    /**
+     * Sets a custom  {@link org.apache.camel.AsyncProcessor} await manager.
+     *
+     * @param manager the manager
+     */
+    void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager manager);
+
+    /**
      * Gets the the application context class loader which may be helpful for running camel in other containers
      *
      * @return the application context class loader

http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
index 5261b39..7c88dd9 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
@@ -88,5 +88,15 @@ public final class CamelOpenMBeanTypes {
                 new OpenType[]{SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING});
     }
 
+    public static TabularType listAwaitThreadsTabularType() throws OpenDataException {
+        CompositeType ct = listAwaitThreadsCompositeType();
+        return new TabularType("listAwaitThreads", "Lists blocked threads by the routing engine", ct, new String[]{"name"});
+    }
+
+    public static CompositeType listAwaitThreadsCompositeType() throws OpenDataException {
+        return new CompositeType("threads", "Threads", new String[]{"name", "exchangeId", "duration"},
+                new String[]{"Thread name", "ExchangeId", "Duration"},
+                new OpenType[]{SimpleType.STRING, SimpleType.STRING, SimpleType.STRING});
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
new file mode 100644
index 0000000..bb5b669
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAsyncProcessorAwaitManagerMBean.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import javax.management.openmbean.TabularData;
+
+import org.apache.camel.api.management.ManagedAttribute;
+import org.apache.camel.api.management.ManagedOperation;
+
+public interface ManagedAsyncProcessorAwaitManagerMBean extends ManagedServiceMBean {
+
+    @ManagedAttribute(description = "Whether to interrupt any blocking threads during stopping.")
+    boolean isInterruptThreadsWhileStopping();
+
+    @ManagedAttribute(description = "Whether to interrupt any blocking threads during stopping.")
+    void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping);
+
+    @ManagedAttribute(description = "Number of threads that are blocked waiting for other threads to trigger the callback when they are done processing the exchange")
+    int getSize();
+
+    @ManagedOperation(description = "Lists all the exchanges which are currently inflight, having a blocked thread awaiting for other threads to trigger the callback when they are done")
+    TabularData browse();
+
+    @ManagedOperation(description = "To interrupt an exchange which may seem as stuck, to force the exchange to continue, allowing any blocking thread to be released.")
+    void interrupt(String exchangeId);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
new file mode 100644
index 0000000..9d78260
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.support.ServiceSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements AsyncProcessorAwaitManager {
+
+    // TODO: capture message history of the exchange when it was interrupted
+    // TODO: capture route id, node id where thread is blocked
+    // TODO: rename to AsyncInflightRepository?
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncProcessorAwaitManager.class);
+
+    private final Map<Exchange, AwaitThread> inflight = new ConcurrentHashMap<Exchange, AwaitThread>();
+
+    private boolean interruptThreadsWhileStopping = true;
+
+    @Override
+    public void await(Exchange exchange, CountDownLatch latch) {
+        LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
+                exchange.getExchangeId(), exchange);
+        try {
+            inflight.put(exchange, new AwaitThreadEntry(Thread.currentThread(), exchange, latch));
+            latch.await();
+            LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}",
+                    exchange.getExchangeId(), exchange);
+
+        } catch (InterruptedException e) {
+            LOG.trace("Interrupted while waiting for callback, will continue routing exchangeId: {} -> {}",
+                    exchange.getExchangeId(), exchange);
+            exchange.setException(e);
+        } finally {
+            inflight.remove(exchange);
+        }
+    }
+
+    @Override
+    public void countDown(Exchange exchange, CountDownLatch latch) {
+        LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId());
+        latch.countDown();
+    }
+
+    @Override
+    public int size() {
+        return inflight.size();
+    }
+
+    @Override
+    public Collection<AwaitThread> browse() {
+        return Collections.unmodifiableCollection(inflight.values());
+    }
+
+    @Override
+    public void interrupt(Exchange exchange) {
+        AwaitThreadEntry latch = (AwaitThreadEntry) inflight.get(exchange);
+        if (latch != null) {
+            LOG.warn("Interrupted while waiting for asynchronous callback, will continue routing exchangeId: {} -> {}",
+                    exchange.getExchangeId(), exchange);
+            exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback"));
+            latch.getLatch().countDown();
+        }
+    }
+
+    public boolean isInterruptThreadsWhileStopping() {
+        return interruptThreadsWhileStopping;
+    }
+
+    public void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping) {
+        this.interruptThreadsWhileStopping = interruptThreadsWhileStopping;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        // noop
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        Collection<AwaitThread> threads = browse();
+        int count = threads.size();
+        if (count > 0) {
+            LOG.warn("Shutting down while there are still " + count + " inflight threads currently blocked.");
+
+            StringBuilder sb = new StringBuilder();
+            for (AwaitThread entry : threads) {
+                sb.append("\tBlocked thread: ").append(entry.getBlockedThread().getName())
+                        .append(", exchangeId=").append(entry.getExchange().getExchangeId())
+                        .append(", duration=").append(entry.getWaitDuration()).append(" msec.");
+            }
+
+            if (isInterruptThreadsWhileStopping()) {
+                LOG.warn("The following threads are blocked and will be interrupted so the threads are released:\n" + sb.toString());
+                for (AwaitThread entry : threads) {
+                    try {
+                        interrupt(entry.getExchange());
+                    } catch (Throwable e) {
+                        LOG.warn("Error while interrupting thread: " + entry.getBlockedThread().getName() + ". This exception is ignored.", e);
+                    }
+                }
+            } else {
+                LOG.warn("The following threads are blocked, and may reside in the JVM:\n" + sb.toString());
+            }
+        } else {
+            LOG.debug("Shutting down with no inflight threads.");
+        }
+
+        inflight.clear();
+    }
+
+    private static final class AwaitThreadEntry implements AwaitThread {
+        private final Thread thread;
+        private final Exchange exchange;
+        private final CountDownLatch latch;
+        private final long start;
+
+        private AwaitThreadEntry(Thread thread, Exchange exchange, CountDownLatch latch) {
+            this.thread = thread;
+            this.exchange = exchange;
+            this.latch = latch;
+            this.start = System.currentTimeMillis();
+        }
+
+        @Override
+        public Thread getBlockedThread() {
+            return thread;
+        }
+
+        @Override
+        public Exchange getExchange() {
+            return exchange;
+        }
+
+        @Override
+        public long getWaitDuration() {
+            return System.currentTimeMillis() - start;
+        }
+
+        public CountDownLatch getLatch() {
+            return latch;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 7eb7fe6..7e526f5 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -94,6 +94,7 @@ import org.apache.camel.processor.interceptor.Delayer;
 import org.apache.camel.processor.interceptor.HandleFault;
 import org.apache.camel.processor.interceptor.StreamCaching;
 import org.apache.camel.processor.interceptor.Tracer;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.CamelContextNameStrategy;
 import org.apache.camel.spi.ClassResolver;
 import org.apache.camel.spi.ComponentResolver;
@@ -231,6 +232,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
     private InterceptStrategy defaultBacklogTracer;
     private InterceptStrategy defaultBacklogDebugger;
     private InflightRepository inflightRepository = new DefaultInflightRepository();
+    private AsyncProcessorAwaitManager asyncProcessorAwaitManager = new DefaultAsyncProcessorAwaitManager();
     private RuntimeEndpointRegistry runtimeEndpointRegistry = new DefaultRuntimeEndpointRegistry();
     private final List<RouteStartupOrder> routeStartupOrder = new ArrayList<RouteStartupOrder>();
     // start auto assigning route ids using numbering 1000 and upwards
@@ -2082,6 +2084,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
         doAddService(executorServiceManager, false);
         addService(producerServicePool);
         addService(inflightRepository);
+        addService(asyncProcessorAwaitManager);
         addService(shutdownStrategy);
         addService(packageScanClassResolver);
         addService(restRegistry);
@@ -2972,6 +2975,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
         this.inflightRepository = repository;
     }
 
+    public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() {
+        return asyncProcessorAwaitManager;
+    }
+
+    public void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager asyncProcessorAwaitManager) {
+        this.asyncProcessorAwaitManager = asyncProcessorAwaitManager;
+    }
+
     public void setAutoStartup(Boolean autoStartup) {
         this.autoStartup = autoStartup;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
index 3e44b60..6291545 100644
--- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementLifecycleStrategy.java
@@ -51,6 +51,7 @@ import org.apache.camel.impl.EndpointRegistry;
 import org.apache.camel.impl.EventDrivenConsumerRoute;
 import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
+import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager;
 import org.apache.camel.management.mbean.ManagedBacklogDebugger;
 import org.apache.camel.management.mbean.ManagedBacklogTracer;
 import org.apache.camel.management.mbean.ManagedCamelContext;
@@ -78,6 +79,7 @@ import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.interceptor.BacklogDebugger;
 import org.apache.camel.processor.interceptor.BacklogTracer;
 import org.apache.camel.processor.interceptor.Tracer;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.EventNotifier;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.ManagementAgent;
@@ -466,6 +468,8 @@ public class DefaultManagementLifecycleStrategy extends ServiceSupport implement
             answer = new ManagedTypeConverterRegistry(context, (TypeConverterRegistry) service);
         } else if (service instanceof RestRegistry) {
             answer = new ManagedRestRegistry(context, (RestRegistry) service);
+        } else if (service instanceof AsyncProcessorAwaitManager) {
+            answer = new ManagedAsyncProcessorAwaitManager(context, (AsyncProcessorAwaitManager) service);
         } else if (service instanceof RuntimeEndpointRegistry) {
             answer = new ManagedRuntimeEndpointRegistry(context, (RuntimeEndpointRegistry) service);
         } else if (service instanceof StreamCachingStrategy) {

http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
new file mode 100644
index 0000000..bd32b4e
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import java.util.Collection;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes;
+import org.apache.camel.api.management.mbean.ManagedAsyncProcessorAwaitManagerMBean;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ *
+ */
+@ManagedResource(description = "Managed AsyncProcessorAwaitManager")
+public class ManagedAsyncProcessorAwaitManager extends ManagedService implements ManagedAsyncProcessorAwaitManagerMBean {
+
+    private final AsyncProcessorAwaitManager manager;
+
+    public ManagedAsyncProcessorAwaitManager(CamelContext context, AsyncProcessorAwaitManager manager) {
+        super(context, manager);
+        this.manager = manager;
+    }
+
+    public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() {
+        return manager;
+    }
+
+    @Override
+    public boolean isInterruptThreadsWhileStopping() {
+        return manager.isInterruptThreadsWhileStopping();
+    }
+
+    @Override
+    public void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping) {
+        manager.setInterruptThreadsWhileStopping(interruptThreadsWhileStopping);
+    }
+
+    @Override
+    public int getSize() {
+        return manager.size();
+    }
+
+    @Override
+    public TabularData browse() {
+        try {
+            TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.listAwaitThreadsTabularType());
+            Collection<AsyncProcessorAwaitManager.AwaitThread> threads = manager.browse();
+            for (AsyncProcessorAwaitManager.AwaitThread entry : threads) {
+                CompositeType ct = CamelOpenMBeanTypes.listAwaitThreadsCompositeType();
+                String name = entry.getBlockedThread().getName();
+                String exchangeId = entry.getExchange().getExchangeId();
+                String duration = "" + entry.getWaitDuration();
+
+                CompositeData data = new CompositeDataSupport(ct, new String[]
+                        {"name", "exchangeId", "duration"},
+                        new Object[]{name, exchangeId, duration});
+                answer.put(data);
+            }
+            return answer;
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+    }
+
+    @Override
+    public void interrupt(String exchangeId) {
+        // need to find the exchange with the given exchange id
+        Exchange found = null;
+        for (AsyncProcessorAwaitManager.AwaitThread entry : manager.browse()) {
+            Exchange exchange = entry.getExchange();
+            if (exchangeId.equals(exchange.getExchangeId())) {
+                found = exchange;
+                break;
+            }
+        }
+
+        if (found != null) {
+            manager.interrupt(found);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
new file mode 100644
index 0000000..7780e3f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.StaticService;
+
+/**
+ * A manager to handle async routing engine, when {@link Exchange}s are being handed over from one thread to another, while
+ * the callee thread is blocked waiting for the other threads to complete, before it can continue.
+ * <p/>
+ * This manager offers insight into the state, and allow to force stuck exchanges to be continued and threads to continue,
+ * in case of malfunctions.
+ */
+public interface AsyncProcessorAwaitManager extends StaticService {
+
+    /**
+     * Information about the thread and exchange that are inflight.
+     */
+    interface AwaitThread {
+
+        /**
+         * The thread which is blocked waiting for other threads to signal the callback.
+         */
+        Thread getBlockedThread();
+
+        /**
+         * The exchange being processed by the other thread.
+         */
+        Exchange getExchange();
+
+        /**
+         * Time in millis the thread has been blocked waiting for the signal.
+         */
+        long getWaitDuration();
+    }
+
+    /**
+     * Registers the exchange to await for the callback to be triggered by another thread which has taken over processing
+     * this exchange. The current thread will await until that callback happens in the future (blocking until this happens).
+     *
+     * @param exchange   the exchange
+     * @param latch      the latch used to wait for other thread to signal when its done
+     */
+    void await(Exchange exchange, CountDownLatch latch);
+
+    /**
+     * Triggered when the other thread is done processing the exchange, to signal to the waiting thread is done, and can take
+     * over control to further process the exchange.
+     *
+     * @param exchange   the exchange
+     * @param latch      the latch used to wait for other thread to signal when its done
+     */
+    void countDown(Exchange exchange, CountDownLatch latch);
+
+    /**
+     * Number of threads that are blocked waiting for other threads to trigger the callback when they are done processing
+     * the exchange.
+     */
+    int size();
+
+    /**
+     * A <i>read-only</i> browser of the {@link AwaitThread}s that are currently inflight.
+     */
+    Collection<AwaitThread> browse();
+
+    /**
+     * To interrupt an exchange which may seem as stuck, to force the exchange to continue,
+     * allowing any blocking thread to be released.
+     * <p/>
+     * <b>Important:</b> Use this with caution as the other thread is still assumed to be process the exchange. Though
+     * if it appears as the exchange is <i>stuck</i>, then this method can remedy this, by forcing the latch to count-down
+     * so the blocked thread can continue. An exception is set on the exchange which allows Camel's error handler to deal
+     * with this malfunctioned exchange.
+     *
+     * @param exchange    the exchange to interrupt.
+     */
+    void interrupt(Exchange exchange);
+
+    /**
+     * Whether to interrupt any blocking threads during stopping.
+     * <p/>
+     * This is enabled by default which allows Camel to release any blocked thread during shutting down Camel itself.
+     */
+    boolean isInterruptThreadsWhileStopping();
+
+    /**
+     * Sets whether to interrupt any blocking threads during stopping.
+     * <p/>
+     * This is enabled by default which allows Camel to release any blocked thread during shutting down Camel itself.
+     */
+    void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
index 2115d3a..cfdfbcc 100644
--- a/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/AsyncProcessorHelper.java
@@ -17,9 +17,11 @@
 package org.apache.camel.util;
 
 import java.util.concurrent.CountDownLatch;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.UnitOfWork;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -101,12 +103,13 @@ public final class AsyncProcessorHelper {
      * @throws Exception can be thrown if waiting is interrupted
      */
     public static void process(final AsyncProcessor processor, final Exchange exchange) throws Exception {
+        final AsyncProcessorAwaitManager awaitManager = exchange.getContext().getAsyncProcessorAwaitManager();
+
         final CountDownLatch latch = new CountDownLatch(1);
         boolean sync = processor.process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
                 if (!doneSync) {
-                    LOG.trace("Asynchronous callback received for exchangeId: {}", exchange.getExchangeId());
-                    latch.countDown();
+                    awaitManager.countDown(exchange, latch);
                 }
             }
 
@@ -116,11 +119,7 @@ public final class AsyncProcessorHelper {
             }
         });
         if (!sync) {
-            LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
-                    exchange.getExchangeId(), exchange);
-            latch.await();
-            LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}",
-                    exchange.getExchangeId(), exchange);
+            awaitManager.await(exchange, latch);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java b/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
index 0907c1f..a3cf751 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
@@ -51,7 +51,7 @@ public class MultipleLifecycleStrategyTest extends TestSupport {
         context.stop();
 
         List<String> expectedEvents = Arrays.asList("onContextStart", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd",
-                "onServiceAdd", "onServiceAdd", "onServiceAdd", "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop");
+                "onServiceAdd", "onServiceAdd", "onServiceAdd", "onServiceAdd", "onComponentAdd", "onEndpointAdd", "onComponentRemove", "onContextStop");
         
         assertEquals(expectedEvents, dummy1.getEvents());
         assertEquals(expectedEvents, dummy2.getEvents());

http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
index a8201e3..86ec502 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java
@@ -47,7 +47,7 @@ public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementT
 
         // number of services
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // number of producers
         ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -72,7 +72,7 @@ public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementT
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // but as its recipient list which is dynamic-to we add new producers because we have register always
         namesP = mbeanServer.queryNames(onP, null);
@@ -87,7 +87,7 @@ public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementT
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // and we still have the other producers, but not the one from the 2nd route that was removed
         namesP = mbeanServer.queryNames(onP, null);

http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
index 4ba5c04..46656de 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java
@@ -59,7 +59,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
         // number of services
         ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=services,*");
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // number of producers
         ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -84,7 +84,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // but we should have one more producer
         namesP = mbeanServer.queryNames(onP, null);
@@ -99,7 +99,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // and the 2nd producer should be removed
         namesP = mbeanServer.queryNames(onP, null);
@@ -119,7 +119,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // number of services
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // number of producers
         ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -144,7 +144,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // but as its recipient list which is dynamic-to we do not add a new producer
         namesP = mbeanServer.queryNames(onP, null);
@@ -159,7 +159,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // and we still have the original producer
         namesP = mbeanServer.queryNames(onP, null);
@@ -179,7 +179,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // number of services
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // number of producers
         ObjectName onP = ObjectName.getInstance("org.apache.camel:context=camel-1,type=producers,*");
@@ -204,7 +204,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // but as its recipient list which is dynamic-to we do not add a new producer
         namesP = mbeanServer.queryNames(onP, null);
@@ -219,7 +219,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // and we still have the original producer
         namesP = mbeanServer.queryNames(onP, null);
@@ -239,7 +239,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // number of services
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         log.info("Adding 2nd route");
 
@@ -269,7 +269,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // now stop and remove the 2nd route
         log.info("Stopping 2nd route");
@@ -281,7 +281,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         log.info("Shutting down...");
     }
@@ -297,7 +297,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // number of services
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         log.info("Adding 2nd route");
 
@@ -328,7 +328,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // now stop and remove the 2nd route
         log.info("Stopping 2nd route");
@@ -340,7 +340,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         log.info("Shutting down...");
     }
@@ -356,7 +356,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // number of services
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         log.info("Adding 2nd route");
 
@@ -385,7 +385,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // now stop and remove the 2nd route
         log.info("Stopping 2nd route");
@@ -397,7 +397,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         log.info("Shutting down...");
     }
@@ -413,7 +413,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // number of services
         Set<ObjectName> names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         log.info("Adding 2nd route");
 
@@ -443,7 +443,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         // now stop and remove the 2nd route
         log.info("Stopping 2nd route");
@@ -455,7 +455,7 @@ public class ManagedRouteAddRemoveTest extends ManagementTestSupport {
 
         // there should still be the same number of services
         names = mbeanServer.queryNames(on, null);
-        assertEquals(8, names.size());
+        assertEquals(9, names.size());
 
         log.info("Shutting down...");
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/057fb60e/camel-core/src/test/java/org/apache/camel/processor/ThroughPutLoggerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThroughPutLoggerTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThroughPutLoggerTest.java
index 9a3601d..8b6f667 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThroughPutLoggerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThroughPutLoggerTest.java
@@ -17,6 +17,9 @@
 package org.apache.camel.processor;
 
 import junit.framework.TestCase;
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.util.CamelLogger;
 import org.easymock.EasyMock;
 import org.slf4j.Logger;
@@ -25,6 +28,9 @@ import org.slf4j.Marker;
 public class ThroughPutLoggerTest extends TestCase {
 
     public void testLogStringDurationIsNotZero() throws Exception {
+        CamelContext camel = new DefaultCamelContext();
+        camel.start();
+
         Logger logger = EasyMock.createMock(Logger.class);
         logger.isInfoEnabled();
         EasyMock.expectLastCall().andReturn(true).atLeastOnce();
@@ -36,8 +42,10 @@ public class ThroughPutLoggerTest extends TestCase {
         ThroughputLogger underTest = new ThroughputLogger(new CamelLogger(logger));
         underTest.setGroupSize(10);
         for (int i = 0; i < 25; i++) {
-            underTest.process(null);
+            underTest.process(new DefaultExchange(camel));
         }
         EasyMock.verify(logger);
+
+        camel.stop();
     }
 }


[4/5] camel git commit: CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks

Posted by da...@apache.org.
CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cd6cd485
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cd6cd485
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cd6cd485

Branch: refs/heads/master
Commit: cd6cd48551cdbffb59c12fd0903749397583c623
Parents: 9e010df
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Dec 20 18:21:09 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Dec 20 18:21:09 2014 +0100

----------------------------------------------------------------------
 .../processor/async/AsyncProcessorAwaitManagerInterruptTest.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/cd6cd485/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
index d78f7f1..2d97c80 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
@@ -60,7 +60,7 @@ public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport
 
                 from("direct:start").routeId("myRoute")
                         .to("mock:before")
-                        .to("async:bye:camel").id("myAsync")
+                        .to("async:bye:camel?delay=500").id("myAsync")
                         .to("mock:after")
                         .process(new Processor() {
                             @Override


[2/5] camel git commit: CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks

Posted by da...@apache.org.
CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/36c2d70d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/36c2d70d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/36c2d70d

Branch: refs/heads/master
Commit: 36c2d70d5f952e4b8eaa9cf9ed0b3af26e5f66ef
Parents: 057fb60
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Dec 20 13:55:07 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Dec 20 14:35:09 2014 +0100

----------------------------------------------------------------------
 .../management/mbean/CamelOpenMBeanTypes.java   |   8 +-
 .../impl/DefaultAsyncProcessorAwaitManager.java | 120 ++++++++++++++++---
 .../ManagedAsyncProcessorAwaitManager.java      |  24 ++--
 .../camel/spi/AsyncProcessorAwaitManager.java   |  28 +++++
 ...AsyncProcessorAwaitManagerInterruptTest.java |  86 +++++++++++++
 .../async/AsyncProcessorAwaitManagerTest.java   |  80 +++++++++++++
 6 files changed, 311 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/36c2d70d/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
index 7c88dd9..070f5c8 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java
@@ -90,13 +90,13 @@ public final class CamelOpenMBeanTypes {
 
     public static TabularType listAwaitThreadsTabularType() throws OpenDataException {
         CompositeType ct = listAwaitThreadsCompositeType();
-        return new TabularType("listAwaitThreads", "Lists blocked threads by the routing engine", ct, new String[]{"name"});
+        return new TabularType("listAwaitThreads", "Lists blocked threads by the routing engine", ct, new String[]{"id"});
     }
 
     public static CompositeType listAwaitThreadsCompositeType() throws OpenDataException {
-        return new CompositeType("threads", "Threads", new String[]{"name", "exchangeId", "duration"},
-                new String[]{"Thread name", "ExchangeId", "Duration"},
-                new OpenType[]{SimpleType.STRING, SimpleType.STRING, SimpleType.STRING});
+        return new CompositeType("threads", "Threads", new String[]{"id", "name", "exchangeId", "routeId", "nodeId", "duration"},
+                new String[]{"Thread Id", "Thread name", "ExchangeId", "RouteId", "NodeId", "Duration"},
+                new OpenType[]{SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.STRING});
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/36c2d70d/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
index 9d78260..20c2927 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
@@ -18,29 +18,41 @@ package org.apache.camel.impl;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.MessageHistory;
+import org.apache.camel.processor.DefaultExchangeFormatter;
 import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.spi.ExchangeFormatter;
 import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.MessageHelper;
+import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements AsyncProcessorAwaitManager {
 
-    // TODO: capture message history of the exchange when it was interrupted
-    // TODO: capture route id, node id where thread is blocked
-    // TODO: rename to AsyncInflightRepository?
-
     private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncProcessorAwaitManager.class);
 
     private final Map<Exchange, AwaitThread> inflight = new ConcurrentHashMap<Exchange, AwaitThread>();
-
+    private final ExchangeFormatter exchangeFormatter;
     private boolean interruptThreadsWhileStopping = true;
 
+    public DefaultAsyncProcessorAwaitManager() {
+        // setup exchange formatter to be used for message history dump
+        DefaultExchangeFormatter formatter = new DefaultExchangeFormatter();
+        formatter.setShowExchangeId(true);
+        formatter.setMultiline(true);
+        formatter.setShowHeaders(true);
+        formatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed);
+        this.exchangeFormatter = formatter;
+    }
+
     @Override
     public void await(Exchange exchange, CountDownLatch latch) {
         LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",
@@ -77,13 +89,47 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
     }
 
     @Override
+    public void interrupt(String exchangeId) {
+        // need to find the exchange with the given exchange id
+        Exchange found = null;
+        for (AsyncProcessorAwaitManager.AwaitThread entry : browse()) {
+            Exchange exchange = entry.getExchange();
+            if (exchangeId.equals(exchange.getExchangeId())) {
+                found = exchange;
+                break;
+            }
+        }
+
+        if (found != null) {
+            interrupt(found);
+        }
+    }
+
+    @Override
     public void interrupt(Exchange exchange) {
-        AwaitThreadEntry latch = (AwaitThreadEntry) inflight.get(exchange);
-        if (latch != null) {
-            LOG.warn("Interrupted while waiting for asynchronous callback, will continue routing exchangeId: {} -> {}",
-                    exchange.getExchangeId(), exchange);
-            exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback"));
-            latch.getLatch().countDown();
+        AwaitThreadEntry entry = (AwaitThreadEntry) inflight.get(exchange);
+        if (entry != null) {
+            try {
+                StringBuilder sb = new StringBuilder();
+                sb.append("Interrupted while waiting for asynchronous callback, will release the following blocked thread which was waiting for exchange to finish processing with exchangeId: ");
+                sb.append(exchange.getExchangeId());
+                sb.append("\n");
+
+                sb.append(dumpBlockedThread(entry));
+
+                // dump a route stack trace of the exchange
+                String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, exchangeFormatter, false);
+                if (routeStackTrace != null) {
+                    sb.append(routeStackTrace);
+                }
+                LOG.warn(sb.toString());
+
+            } catch (Exception e) {
+                throw ObjectHelper.wrapRuntimeCamelException(e);
+            } finally {
+                exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
+                entry.getLatch().countDown();
+            }
         }
     }
 
@@ -109,9 +155,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
 
             StringBuilder sb = new StringBuilder();
             for (AwaitThread entry : threads) {
-                sb.append("\tBlocked thread: ").append(entry.getBlockedThread().getName())
-                        .append(", exchangeId=").append(entry.getExchange().getExchangeId())
-                        .append(", duration=").append(entry.getWaitDuration()).append(" msec.");
+                sb.append(dumpBlockedThread(entry));
             }
 
             if (isInterruptThreadsWhileStopping()) {
@@ -133,17 +177,50 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
         inflight.clear();
     }
 
+    private static String dumpBlockedThread(AwaitThread entry) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("\n");
+        sb.append("Blocked Thread\n");
+        sb.append("---------------------------------------------------------------------------------------------------------------------------------------\n");
+
+        sb.append(style("Id:")).append(entry.getBlockedThread().getId()).append("\n");
+        sb.append(style("Name:")).append(entry.getBlockedThread().getName()).append("\n");
+        sb.append(style("RouteId:")).append(safeNull(entry.getRouteId())).append("\n");
+        sb.append(style("NodeId:")).append(safeNull(entry.getNodeId())).append("\n");
+        sb.append(style("Duration:")).append(entry.getWaitDuration()).append(" msec.\n");
+        return sb.toString();
+    }
+
+    private static String style(String label) {
+        return String.format("\t%-20s", label);
+    }
+
+    private static String safeNull(Object value) {
+        return value != null ? value.toString() : "";
+    }
+
     private static final class AwaitThreadEntry implements AwaitThread {
         private final Thread thread;
         private final Exchange exchange;
         private final CountDownLatch latch;
         private final long start;
+        private String routeId;
+        private String nodeId;
 
         private AwaitThreadEntry(Thread thread, Exchange exchange, CountDownLatch latch) {
             this.thread = thread;
             this.exchange = exchange;
             this.latch = latch;
             this.start = System.currentTimeMillis();
+
+            // capture details from message history if enabled
+            List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
+            if (list != null && !list.isEmpty()) {
+                // grab last part
+                MessageHistory history = list.get(list.size() - 1);
+                routeId = history.getRouteId();
+                nodeId = history.getNode() != null ? history.getNode().getId() : null;
+            }
         }
 
         @Override
@@ -161,9 +238,24 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
             return System.currentTimeMillis() - start;
         }
 
+        @Override
+        public String getRouteId() {
+            return routeId;
+        }
+
+        @Override
+        public String getNodeId() {
+            return nodeId;
+        }
+
         public CountDownLatch getLatch() {
             return latch;
         }
+
+        @Override
+        public String toString() {
+            return "AwaitThreadEntry[name=" + thread.getName() + ", exchangeId=" + exchange.getExchangeId() + "]";
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/36c2d70d/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
index bd32b4e..a4759ef 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAsyncProcessorAwaitManager.java
@@ -24,7 +24,6 @@ import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
 import org.apache.camel.api.management.ManagedResource;
 import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes;
 import org.apache.camel.api.management.mbean.ManagedAsyncProcessorAwaitManagerMBean;
@@ -70,13 +69,16 @@ public class ManagedAsyncProcessorAwaitManager extends ManagedService implements
             Collection<AsyncProcessorAwaitManager.AwaitThread> threads = manager.browse();
             for (AsyncProcessorAwaitManager.AwaitThread entry : threads) {
                 CompositeType ct = CamelOpenMBeanTypes.listAwaitThreadsCompositeType();
+                String id = "" + entry.getBlockedThread().getId();
                 String name = entry.getBlockedThread().getName();
                 String exchangeId = entry.getExchange().getExchangeId();
+                String routeId = entry.getRouteId();
+                String nodeId = entry.getNodeId();
                 String duration = "" + entry.getWaitDuration();
 
-                CompositeData data = new CompositeDataSupport(ct, new String[]
-                        {"name", "exchangeId", "duration"},
-                        new Object[]{name, exchangeId, duration});
+                CompositeData data = new CompositeDataSupport(ct,
+                        new String[]{"id", "name", "exchangeId", "routeId", "nodeId", "duration"},
+                        new Object[]{id, name, exchangeId, routeId, nodeId, duration});
                 answer.put(data);
             }
             return answer;
@@ -87,19 +89,7 @@ public class ManagedAsyncProcessorAwaitManager extends ManagedService implements
 
     @Override
     public void interrupt(String exchangeId) {
-        // need to find the exchange with the given exchange id
-        Exchange found = null;
-        for (AsyncProcessorAwaitManager.AwaitThread entry : manager.browse()) {
-            Exchange exchange = entry.getExchange();
-            if (exchangeId.equals(exchange.getExchangeId())) {
-                found = exchange;
-                break;
-            }
-        }
-
-        if (found != null) {
-            manager.interrupt(found);
-        }
+        manager.interrupt(exchangeId);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/36c2d70d/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
index 7780e3f..0b39c1c 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
@@ -50,6 +50,21 @@ public interface AsyncProcessorAwaitManager extends StaticService {
          * Time in millis the thread has been blocked waiting for the signal.
          */
         long getWaitDuration();
+
+        /**
+         * The id of the route where the exchange was processed when the thread was set to block.
+         * <p/>
+         * Is <tt>null</tt> if message history is disabled.
+         */
+        String getRouteId();
+
+        /**
+         * The id of the node from the route where the exchange was processed when the thread was set to block.
+         * <p/>
+         * Is <tt>null</tt> if message history is disabled.
+         */
+        String getNodeId();
+
     }
 
     /**
@@ -90,6 +105,19 @@ public interface AsyncProcessorAwaitManager extends StaticService {
      * so the blocked thread can continue. An exception is set on the exchange which allows Camel's error handler to deal
      * with this malfunctioned exchange.
      *
+     * @param exchangeId    the exchange id to interrupt.
+     */
+    void interrupt(String exchangeId);
+
+    /**
+     * To interrupt an exchange which may seem as stuck, to force the exchange to continue,
+     * allowing any blocking thread to be released.
+     * <p/>
+     * <b>Important:</b> Use this with caution as the other thread is still assumed to be process the exchange. Though
+     * if it appears as the exchange is <i>stuck</i>, then this method can remedy this, by forcing the latch to count-down
+     * so the blocked thread can continue. An exception is set on the exchange which allows Camel's error handler to deal
+     * with this malfunctioned exchange.
+     *
      * @param exchange    the exchange to interrupt.
      */
     void interrupt(Exchange exchange);

http://git-wip-us.apache.org/repos/asf/camel/blob/36c2d70d/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
new file mode 100644
index 0000000..d78f7f1
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import java.util.Collection;
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
+
+/**
+ * @version 
+ */
+public class AsyncProcessorAwaitManagerInterruptTest extends ContextTestSupport {
+
+    public void testAsyncAwaitInterrupt() throws Exception {
+        assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        try {
+            template.requestBody("direct:start", "Hello Camel", String.class);
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            RejectedExecutionException cause = assertIsInstanceOf(RejectedExecutionException.class, e.getCause());
+            assertTrue(cause.getMessage().startsWith("Interrupted while waiting for asynchronous callback"));
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.addComponent("async", new MyAsyncComponent());
+
+                from("direct:start").routeId("myRoute")
+                        .to("mock:before")
+                        .to("async:bye:camel").id("myAsync")
+                        .to("mock:after")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                int size = context.getAsyncProcessorAwaitManager().size();
+                                log.info("async inflight: {}", size);
+                                assertEquals(1, size);
+
+                                Collection<AsyncProcessorAwaitManager.AwaitThread> threads = context.getAsyncProcessorAwaitManager().browse();
+                                AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next();
+
+                                // lets interrupt it
+                                String id = thread.getExchange().getExchangeId();
+                                context.getAsyncProcessorAwaitManager().interrupt(id);
+                            }
+                        })
+                        .transform(constant("Hi Camel"))
+                        .to("mock:result");
+            }
+        };
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/36c2d70d/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
new file mode 100644
index 0000000..8389069
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import java.util.Collection;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
+
+/**
+ * @version 
+ */
+public class AsyncProcessorAwaitManagerTest extends ContextTestSupport {
+
+    public void testAsyncAwait() throws Exception {
+        assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+        String reply = template.requestBody("direct:start", "Hello Camel", String.class);
+        assertEquals("Bye Camel", reply);
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.addComponent("async", new MyAsyncComponent());
+
+                from("direct:start").routeId("myRoute")
+                        .to("mock:before")
+                        .to("async:bye:camel").id("myAsync")
+                        .to("mock:after")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                int size = context.getAsyncProcessorAwaitManager().size();
+                                log.info("async inflight: {}", size);
+                                assertEquals(1, size);
+
+                                Collection<AsyncProcessorAwaitManager.AwaitThread> threads = context.getAsyncProcessorAwaitManager().browse();
+                                AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next();
+
+                                long wait = thread.getWaitDuration();
+                                log.info("Thread {} has waited for {} msec.", thread.getBlockedThread().getName(), wait);
+
+                                assertEquals("myRoute", thread.getRouteId());
+                                assertEquals("myAsync", thread.getNodeId());
+                            }
+                        })
+                        .to("mock:result");
+            }
+        };
+    }
+
+}
\ No newline at end of file


[5/5] camel git commit: CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks

Posted by da...@apache.org.
CAMEL-8165: Async routing engine - Add insight into threads blocked waiting for callbacks


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bf19896c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bf19896c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bf19896c

Branch: refs/heads/master
Commit: bf19896c0cfce610d704b5f6eb626875afe5af76
Parents: cd6cd48
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Dec 21 08:14:08 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Dec 21 08:50:45 2014 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/camel/impl/DefaultCamelContext.java   | 3 +++
 .../java/org/apache/camel/spi/AsyncProcessorAwaitManager.java  | 5 +++--
 .../apache/camel/core/xml/AbstractCamelContextFactoryBean.java | 6 ++++++
 3 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bf19896c/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 7e526f5..eb784b6 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -2167,6 +2167,9 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
         }
         getRouteStartupOrder().clear();
 
+        // shutdown await manager to trigger interrupt of blocked threads to attempt to free these threads graceful
+        shutdownServices(asyncProcessorAwaitManager);
+
         shutdownServices(routeServices.values());
         // do not clear route services or startup listeners as we can start Camel again and get the route back as before
 

http://git-wip-us.apache.org/repos/asf/camel/blob/bf19896c/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
index 0b39c1c..3081449 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/AsyncProcessorAwaitManager.java
@@ -26,8 +26,9 @@ import org.apache.camel.StaticService;
  * A manager to handle async routing engine, when {@link Exchange}s are being handed over from one thread to another, while
  * the callee thread is blocked waiting for the other threads to complete, before it can continue.
  * <p/>
- * This manager offers insight into the state, and allow to force stuck exchanges to be continued and threads to continue,
- * in case of malfunctions.
+ * This manager offers insight into the state, and allow to force stuck exchanges to be continued and for blocked threads
+ * to be unblocked, which may happen in case of severe malfunctions (such as the system runs out of memory, a 3rd party
+ * never responding, or a timeout not triggering, etc).
  */
 public interface AsyncProcessorAwaitManager extends StaticService {
 

http://git-wip-us.apache.org/repos/asf/camel/blob/bf19896c/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
----------------------------------------------------------------------
diff --git a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
index 32b84a8..8aca563 100644
--- a/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
+++ b/components/camel-core-xml/src/main/java/org/apache/camel/core/xml/AbstractCamelContextFactoryBean.java
@@ -69,6 +69,7 @@ import org.apache.camel.processor.interceptor.BacklogTracer;
 import org.apache.camel.processor.interceptor.HandleFault;
 import org.apache.camel.processor.interceptor.TraceFormatter;
 import org.apache.camel.processor.interceptor.Tracer;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
 import org.apache.camel.spi.ClassResolver;
 import org.apache.camel.spi.Debugger;
 import org.apache.camel.spi.EndpointStrategy;
@@ -204,6 +205,11 @@ public abstract class AbstractCamelContextFactoryBean<T extends ModelCamelContex
             LOG.info("Using custom InflightRepository: {}", inflightRepository);
             getContext().setInflightRepository(inflightRepository);
         }
+        AsyncProcessorAwaitManager asyncProcessorAwaitManager = getBeanForType(AsyncProcessorAwaitManager.class);
+        if (asyncProcessorAwaitManager != null) {
+            LOG.info("Using custom AsyncProcessorAwaitManager: {}", asyncProcessorAwaitManager);
+            getContext().setAsyncProcessorAwaitManager(asyncProcessorAwaitManager);
+        }
         ManagementStrategy managementStrategy = getBeanForType(ManagementStrategy.class);
         if (managementStrategy != null) {
             LOG.info("Using custom ManagementStrategy: {}", managementStrategy);


[3/5] camel git commit: Fixed CS

Posted by da...@apache.org.
Fixed CS


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9e010df0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9e010df0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9e010df0

Branch: refs/heads/master
Commit: 9e010df0fb7a5be9f41c8d1440dbeac4a0c65c6b
Parents: 36c2d70
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Dec 20 14:36:00 2014 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Dec 20 14:36:00 2014 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/camel/component/bean/MethodInfo.java | 3 ---
 1 file changed, 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/9e010df0/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
index aaf3cf0..61f0882 100644
--- a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
+++ b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
@@ -478,20 +478,17 @@ public class MethodInfo {
                             // evaluate the parameter value binding
                             value = evaluateParameterValue(exchange, i, parameterValue, parameterType);
                         }
-
                         // use bean parameter binding, if still no value
                         Expression expression = expressions[i];
                         if (value == null && expression != null) {
                             value = evaluateParameterBinding(exchange, expression, i, parameterType);
                         }
                     }
-
                     // remember the value to use
                     if (value != Void.TYPE) {
                         answer[i] = value;
                     }
                 }
-
                 return (T) answer;
             }