You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/07/12 18:54:17 UTC

camel git commit: CAMEL-11539: WireTap - Add support for defer shutdown if pending tasks are active

Repository: camel
Updated Branches:
  refs/heads/master fb0ab0e96 -> e8a287f42


CAMEL-11539: WireTap - Add support for defer shutdown if pending tasks are active


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e8a287f4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e8a287f4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e8a287f4

Branch: refs/heads/master
Commit: e8a287f42ab5d180db151441513567b886889171
Parents: fb0ab0e
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jul 12 20:18:06 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jul 12 20:44:21 2017 +0200

----------------------------------------------------------------------
 .../management/mbean/ManagedWireTapMBean.java   |  3 +
 .../mbean/ManagedWireTapProcessor.java          |  4 +
 .../camel/processor/WireTapProcessor.java       | 26 +++++-
 .../processor/WireTapShutdownBeanTest.java      | 95 +++++++++++++++++++
 .../processor/WireTapShutdownRouteTest.java     | 98 ++++++++++++++++++++
 .../camel/processor/WireTapVoidBeanTest.java    |  6 +-
 6 files changed, 228 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e8a287f4/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
index ac86d53..d86ede7 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
@@ -35,6 +35,9 @@ public interface ManagedWireTapMBean extends ManagedProcessorMBean, ManagedExten
     @ManagedAttribute(description = "Uses a copy of the original exchange")
     Boolean isCopy();
 
+    @ManagedAttribute(description = "Current size of inflight wire tapped exchanges.")
+    Integer getTaskSize();
+
     @ManagedOperation(description = "Statistics of the endpoints which has been sent to")
     TabularData extendedInformation();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e8a287f4/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
index ceaa0e2..8000799 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
@@ -91,6 +91,10 @@ public class ManagedWireTapProcessor extends ManagedProcessor implements Managed
         return processor.isCopy();
     }
 
+    public Integer getTaskSize() {
+        return processor.getPendingExchangesSize();
+    }
+
     @Override
     public TabularData extendedInformation() {
         try {

http://git-wip-us.apache.org/repos/asf/camel/blob/e8a287f4/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index c6df8f5..3bb8e52 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
@@ -31,11 +33,13 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.StreamCache;
 import org.apache.camel.Traceable;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.spi.EndpointUtilizationStatistics;
 import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
@@ -49,7 +53,7 @@ import org.slf4j.LoggerFactory;
  *
  * @version 
  */
-public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware, CamelContextAware {
+public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, Traceable, ShutdownAware, IdAware, CamelContextAware {
     private static final Logger LOG = LoggerFactory.getLogger(WireTapProcessor.class);
     private String id;
     private CamelContext camelContext;
@@ -59,6 +63,7 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
     private final ExchangePattern exchangePattern;
     private final ExecutorService executorService;
     private volatile boolean shutdownExecutorService;
+    private final LongAdder taskCount = new LongAdder();
 
     // expression or processor used for populating a new exchange to send
     // as opposed to traditional wiretap that sends a copy of the original exchange
@@ -104,6 +109,22 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
         this.camelContext = camelContext;
     }
 
+    @Override
+    public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+        // not in use
+        return true;
+    }
+
+    @Override
+    public int getPendingExchangesSize() {
+        return taskCount.intValue();
+    }
+
+    @Override
+    public void prepareShutdown(boolean suspendOnly, boolean forced) {
+        // noop
+    }
+
     public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
         return dynamicProcessor.getEndpointUtilizationStatistics();
     }
@@ -132,11 +153,14 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
         // send the exchange to the destination using an executor service
         executorService.submit(new Callable<Exchange>() {
             public Exchange call() throws Exception {
+                taskCount.increment();
                 try {
                     LOG.debug(">>>> (wiretap) {} {}", uri, wireTapExchange);
                     processor.process(wireTapExchange);
                 } catch (Throwable e) {
                     LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + uri + ". This exception will be ignored.", e);
+                } finally {
+                    taskCount.decrement();
                 }
                 return wireTapExchange;
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/e8a287f4/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownBeanTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownBeanTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownBeanTest.java
new file mode 100644
index 0000000..7e288ec
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownBeanTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Wire tap unit test
+ */
+public class WireTapShutdownBeanTest extends ContextTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WireTapShutdownBeanTest.class);
+
+    private static final CountDownLatch LATCH = new CountDownLatch(1);
+
+    public void testWireTapShutdown() throws Exception {
+        final MyTapBean tapBean = (MyTapBean) context.getRegistry().lookupByName("tap");
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        LATCH.countDown();
+
+        // shutdown Camel which should let the inlfight wire-tap message route to completion
+        context.stop();
+
+        // should allow to shutdown nicely
+        await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals("Hello World", tapBean.getTapped());
+        });
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("tap", new MyTapBean());
+        return jndi;
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .wireTap("bean:tap").dynamicUri(false)
+                    .to("mock:result");
+            }
+        };
+    }
+
+    public static class MyTapBean {
+
+        private String tapped;
+
+        public void tapSomething(String body) throws Exception {
+            try {
+                LATCH.await(5, TimeUnit.SECONDS);
+                Thread.sleep(100);
+            } catch (Exception e) {
+                fail("Should not be interrupted");
+            }
+            LOG.info("Wire tapping: {}", body);
+            tapped = body;
+        }
+
+        public String getTapped() {
+            return tapped;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/e8a287f4/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownRouteTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownRouteTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownRouteTest.java
new file mode 100644
index 0000000..a008dc5
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownRouteTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Wire tap unit test
+ */
+public class WireTapShutdownRouteTest extends ContextTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WireTapShutdownRouteTest.class);
+
+    private static final CountDownLatch LATCH = new CountDownLatch(1);
+
+    public void testWireTapShutdown() throws Exception {
+        final MyTapBean tapBean = (MyTapBean) context.getRegistry().lookupByName("tap");
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        LATCH.countDown();
+
+        // shutdown Camel which should let the inlfight wire-tap message route to completion
+        context.stop();
+
+        // should allow to shutdown nicely
+        await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            assertEquals("Hello World", tapBean.getTapped());
+        });
+    }
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("tap", new MyTapBean());
+        return jndi;
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start").routeId("foo")
+                    .wireTap("direct:tap")
+                    .to("mock:result");
+
+                from("direct:tap").routeId("bar")
+                    .to("bean:tap");
+            }
+        };
+    }
+
+    public static class MyTapBean {
+
+        private String tapped;
+
+        public void tapSomething(String body) throws Exception {
+            try {
+                LATCH.await(5, TimeUnit.SECONDS);
+                Thread.sleep(100);
+            } catch (Exception e) {
+                fail("Should not be interrupted");
+            }
+            LOG.info("Wire tapping: {}", body);
+            tapped = body;
+        }
+
+        public String getTapped() {
+            return tapped;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/e8a287f4/camel-core/src/test/java/org/apache/camel/processor/WireTapVoidBeanTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapVoidBeanTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapVoidBeanTest.java
index 71b75e6..d690c0c 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/WireTapVoidBeanTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapVoidBeanTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.