You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/07/04 07:40:30 UTC

[camel] branch master updated: CAMEL-12603 - Now setting the Exchange.INTERRUPTED property on an exchange interrupted in the DefaultAsyncProcessorAwaitManager.

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new d08c1ca  CAMEL-12603 - Now setting the Exchange.INTERRUPTED property on an exchange interrupted in the DefaultAsyncProcessorAwaitManager.
d08c1ca is described below

commit d08c1ca0cbf6abd6b67ac19c2295a13df9acb729
Author: Nick Horne <ni...@hotmail.com>
AuthorDate: Wed Jun 27 14:50:43 2018 +0100

    CAMEL-12603 - Now setting the Exchange.INTERRUPTED property on an exchange interrupted in the DefaultAsyncProcessorAwaitManager.
---
 .../impl/DefaultAsyncProcessorAwaitManager.java    |   1 +
 ...sorAwaitManagerInterruptWithRedeliveryTest.java | 134 +++++++++++++++++++++
 2 files changed, 135 insertions(+)

diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
index c8dafaa..4a9ef4d 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultAsyncProcessorAwaitManager.java
@@ -161,6 +161,7 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements
                     interruptedCounter.incrementAndGet();
                 }
                 exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
+                exchange.setProperty(Exchange.INTERRUPTED, Boolean.TRUE);
                 entry.getLatch().countDown();
             }
         }
diff --git a/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java
new file mode 100644
index 0000000..c400d4d
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/async/AsyncProcessorAwaitManagerInterruptWithRedeliveryTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.AsyncProcessorAwaitManager;
+import org.apache.camel.util.jndi.JndiContext;
+
+import javax.naming.Context;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+public class AsyncProcessorAwaitManagerInterruptWithRedeliveryTest extends ContextTestSupport {
+    private CountDownLatch latch;
+    private MyBean bean;
+
+    @Override
+    protected void setUp() throws Exception {
+        latch = new CountDownLatch(2);
+        bean = spy(new MyBean(latch));
+        super.setUp();
+    }
+
+    public void testAsyncAwaitInterrupt() throws Exception {
+        context.getAsyncProcessorAwaitManager().getStatistics().setStatisticsEnabled(true);
+
+        assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+
+        getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+        getMockEndpoint("mock:error").expectedMessageCount(0);
+
+        createThreadToInterrupt();
+        try {
+            template.sendBody("direct:start", "Hello Camel");
+            fail("Should throw exception");
+        } catch (CamelExecutionException e) {
+            RejectedExecutionException cause = assertIsInstanceOf(RejectedExecutionException.class, e.getCause());
+            assertTrue(cause.getMessage().startsWith("Interrupted while waiting for asynchronous callback"));
+        }
+
+        assertMockEndpointsSatisfied();
+
+        // Check we have not reached the full 5 re-deliveries
+        verify(bean, atMost(4)).callMe();
+
+        assertEquals(0, context.getAsyncProcessorAwaitManager().size());
+        assertEquals(1, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsBlocked());
+        assertEquals(1, context.getAsyncProcessorAwaitManager().getStatistics().getThreadsInterrupted());
+    }
+
+    private void createThreadToInterrupt() {
+        new Thread(() -> {
+            // Allow some time for camel exchange to enter the re-deliveries
+            try {
+                latch.await(1, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+            // Get our blocked thread
+            int size = context.getAsyncProcessorAwaitManager().size();
+            assertEquals(1, size);
+
+            Collection<AsyncProcessorAwaitManager.AwaitThread> threads = context.getAsyncProcessorAwaitManager().browse();
+            AsyncProcessorAwaitManager.AwaitThread thread = threads.iterator().next();
+
+            // Interrupt it
+            String id = thread.getExchange().getExchangeId();
+            context.getAsyncProcessorAwaitManager().interrupt(id);
+        }).start();
+    }
+
+    @Override
+    protected Context createJndiContext() throws Exception {
+        JndiContext jndiContext = new JndiContext();
+
+        jndiContext.bind("myBean", bean);
+        return jndiContext;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                errorHandler(deadLetterChannel("mock:error")
+                    .maximumRedeliveries(5)
+                    .redeliveryDelay(100)
+                    .asyncDelayedRedelivery());
+
+                from("direct:start").routeId("myRoute")
+                    .to("mock:before")
+                    .bean("myBean", "callMe")
+                    .to("mock:result");
+            }
+        };
+    }
+
+    public static class MyBean {
+        private CountDownLatch latch;
+
+        public MyBean(CountDownLatch latch) {
+            this.latch = latch;
+        }
+
+        public void callMe() throws Exception {
+            latch.countDown();
+            throw new Exception();
+        }
+    }
+}