You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/07/04 07:40:30 UTC
[camel] branch master updated: CAMEL-12603 - Now setting the
Exchange.INTERRUPTED property on an exchange interrupted in the
DefaultAsyncProcessorAwaitManager.
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new d08c1ca CAMEL-12603 - Now setting the Exchange.INTERRUPTED property on an exchange interrupted in the DefaultAsyncProcessorAwaitManager.
d08c1ca is described below
commit d08c1ca0cbf6abd6b67ac19c2295a13df9acb729
Author: Nick Horne <ni...@hotmail.com>
AuthorDate: Wed Jun 27 14:50:43 2018 +0100
CAMEL-12603 - Now setting the Exchange.INTERRUPTED property on an exchange interrupted in the DefaultAsyncProcessorAwaitManager.
---
.../impl/DefaultAsyncProcessorAwaitManager.java | 1 +
...sorAwaitManagerInterruptWithRedeliveryTest.java | 134 +++++++++++++++++++++
2 files changed, 135 insertions(+)
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
index c8dafaa..4a9ef4d 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
@@ -161,6 +161,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
interruptedCounter.incrementAndGet();
}
exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
+ exchange.setProperty(Exchange.INTERRUPTED, Boolean.TRUE);
entry.getLatch().countDown();
}
}
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java
new file mode 100644
index 0000000..c400d4d
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.util.jndi.JndiContext;
+
+import javax.naming.Context;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+public class AsyncProcessorAwaitManagerInterruptWithRedeliveryTest extends ContextTestSupport {
+ private CountDownLatch latch;
+ private MyBean bean;
+
+ @Override
+ protected void setUp() throws Exception {
+ latch = new CountDownLatch(2);
+ bean = spy(new MyBean(latch));
+ super.setUp();
+ }
+
+ public void testAsyncAwaitInterrupt() throws Exception {
+ context.getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true);
+
+ assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+ getMockEndpoint("mock:error").expectedMessageCount(0);
+
+ createThreadToInterrupt();
+ try {
+ template.sendBody("direct:start", "Hello Camel");
+ fail("Should throw exception");
+ } catch (CamelExecutionException e) {
+ RejectedExecutionException cause = assertIsInstanceOf(RejectedExecutionException.class, e.getCause());
+ assertTrue(cause.getMessage().startsWith("Interrupted while waiting for asynchronous callback"));
+ }
+
+ assertMockEndpointsSatisfied();
+
+ // Check we have not reached the full 5 re-deliveries
+ verify(bean, atMost(4)).callMe();
+
+ assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+ assertEquals(1, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked());
+ assertEquals(1, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsInterrupted());
+ }
+
+ private void createThreadToInterrupt() {
+ new Thread(() -> {
+ // Allow some time for camel exchange to enter the re-deliveries
+ try {
+ latch.await(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ // Get our blocked thread
+ int size = context.getAsyncProcessorAwaitManager().size();
+ assertEquals(1, size);
+
+ Collection<AsyncProcessorAwaitManager.AwaitThread> threads = context.getAsyncProcessorAwaitManager().browse();
+ AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next();
+
+ // Interrupt it
+ String id = thread.getExchange().getExchangeId();
+ context.getAsyncProcessorAwaitManager().interrupt(id);
+ }).start();
+ }
+
+ @Override
+ protected Context createJndiContext() throws Exception {
+ JndiContext jndiContext = new JndiContext();
+
+ jndiContext.bind("myBean", bean);
+ return jndiContext;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ errorHandler(deadLetterChannel("mock:error")
+ .maximumRedeliveries(5)
+ .redeliveryDelay(100)
+ .asyncDelayedRedelivery());
+
+ from("direct:start").routeId("myRoute")
+ .to("mock:before")
+ .bean("myBean", "callMe")
+ .to("mock:result");
+ }
+ };
+ }
+
+ public static class MyBean {
+ private CountDownLatch latch;
+
+ public MyBean(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ public void callMe() throws Exception {
+ latch.countDown();
+ throw new Exception();
+ }
+ }
+}