You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/07/12 18:54:17 UTC
camel git commit: CAMEL-11539: WireTap - Add support for defer
shutdown if pending tasks are active
Repository: camel
Updated Branches:
refs/heads/master fb0ab0e96 -> e8a287f42
CAMEL-11539: WireTap - Add support for defer shutdown if pending tasks are active
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e8a287f4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e8a287f4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e8a287f4
Branch: refs/heads/master
Commit: e8a287f42ab5d180db151441513567b886889171
Parents: fb0ab0e
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jul 12 20:18:06 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Jul 12 20:44:21 2017 +0200
----------------------------------------------------------------------
.../management/mbean/ManagedWireTapMBean.java | 3 +
.../mbean/ManagedWireTapProcessor.java | 4 +
.../camel/processor/WireTapProcessor.java | 26 +++++-
.../processor/WireTapShutdownBeanTest.java | 95 +++++++++++++++++++
.../processor/WireTapShutdownRouteTest.java | 98 ++++++++++++++++++++
.../camel/processor/WireTapVoidBeanTest.java | 6 +-
6 files changed, 228 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e8a287f4/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
index ac86d53..d86ede7 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
@@ -35,6 +35,9 @@ public interface ManagedWireTapMBean extends ManagedProcessorMBean, ManagedExten
@ManagedAttribute(description = "Uses a copy of the original exchange")
Boolean isCopy();
+ @ManagedAttribute(description = "Current size of inflight wire tapped exchanges.")
+ Integer getTaskSize();
+
@ManagedOperation(description = "Statistics of the endpoints which has been sent to")
TabularData extendedInformation();
http://git-wip-us.apache.org/repos/asf/camel/blob/e8a287f4/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
index ceaa0e2..8000799 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
@@ -91,6 +91,10 @@ public class ManagedWireTapProcessor extends ManagedProcessor implements Managed
return processor.isCopy();
}
+ public Integer getTaskSize() {
+ return processor.getPendingExchangesSize();
+ }
+
@Override
public TabularData extendedInformation() {
try {
http://git-wip-us.apache.org/repos/asf/camel/blob/e8a287f4/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index c6df8f5..3bb8e52 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -21,6 +21,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
@@ -31,11 +33,13 @@ import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.StreamCache;
import org.apache.camel.Traceable;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
@@ -49,7 +53,7 @@ import org.slf4j.LoggerFactory;
*
* @version
*/
-public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware, CamelContextAware {
+public class WireTapProcessor extends ServiceSupport implements AsyncProcessor, Traceable, ShutdownAware, IdAware, CamelContextAware {
private static final Logger LOG = LoggerFactory.getLogger(WireTapProcessor.class);
private String id;
private CamelContext camelContext;
@@ -59,6 +63,7 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
private final ExchangePattern exchangePattern;
private final ExecutorService executorService;
private volatile boolean shutdownExecutorService;
+ private final LongAdder taskCount = new LongAdder();
// expression or processor used for populating a new exchange to send
// as opposed to traditional wiretap that sends a copy of the original exchange
@@ -104,6 +109,22 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
this.camelContext = camelContext;
}
+ @Override
+ public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+ // not in use
+ return true;
+ }
+
+ @Override
+ public int getPendingExchangesSize() {
+ return taskCount.intValue();
+ }
+
+ @Override
+ public void prepareShutdown(boolean suspendOnly, boolean forced) {
+ // noop
+ }
+
public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
return dynamicProcessor.getEndpointUtilizationStatistics();
}
@@ -132,11 +153,14 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
// send the exchange to the destination using an executor service
executorService.submit(new Callable<Exchange>() {
public Exchange call() throws Exception {
+ taskCount.increment();
try {
LOG.debug(">>>> (wiretap) {} {}", uri, wireTapExchange);
processor.process(wireTapExchange);
} catch (Throwable e) {
LOG.warn("Error occurred during processing " + wireTapExchange + " wiretap to " + uri + ". This exception will be ignored.", e);
+ } finally {
+ taskCount.decrement();
}
return wireTapExchange;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e8a287f4/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownBeanTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownBeanTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownBeanTest.java
new file mode 100644
index 0000000..7e288ec
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownBeanTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Wire tap unit test
+ */
+public class WireTapShutdownBeanTest extends ContextTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WireTapShutdownBeanTest.class);
+
+ private static final CountDownLatch LATCH = new CountDownLatch(1);
+
+ public void testWireTapShutdown() throws Exception {
+ final MyTapBean tapBean = (MyTapBean) context.getRegistry().lookupByName("tap");
+
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ LATCH.countDown();
+
+ // shutdown Camel which should let the inlfight wire-tap message route to completion
+ context.stop();
+
+ // should allow to shutdown nicely
+ await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals("Hello World", tapBean.getTapped());
+ });
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("tap", new MyTapBean());
+ return jndi;
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:start")
+ .wireTap("bean:tap").dynamicUri(false)
+ .to("mock:result");
+ }
+ };
+ }
+
+ public static class MyTapBean {
+
+ private String tapped;
+
+ public void tapSomething(String body) throws Exception {
+ try {
+ LATCH.await(5, TimeUnit.SECONDS);
+ Thread.sleep(100);
+ } catch (Exception e) {
+ fail("Should not be interrupted");
+ }
+ LOG.info("Wire tapping: {}", body);
+ tapped = body;
+ }
+
+ public String getTapped() {
+ return tapped;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/e8a287f4/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownRouteTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownRouteTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownRouteTest.java
new file mode 100644
index 0000000..a008dc5
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapShutdownRouteTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Wire tap unit test
+ */
+public class WireTapShutdownRouteTest extends ContextTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WireTapShutdownRouteTest.class);
+
+ private static final CountDownLatch LATCH = new CountDownLatch(1);
+
+ public void testWireTapShutdown() throws Exception {
+ final MyTapBean tapBean = (MyTapBean) context.getRegistry().lookupByName("tap");
+
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ LATCH.countDown();
+
+ // shutdown Camel which should let the inlfight wire-tap message route to completion
+ context.stop();
+
+ // should allow to shutdown nicely
+ await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals("Hello World", tapBean.getTapped());
+ });
+ }
+
+ @Override
+ protected JndiRegistry createRegistry() throws Exception {
+ JndiRegistry jndi = super.createRegistry();
+ jndi.bind("tap", new MyTapBean());
+ return jndi;
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("direct:start").routeId("foo")
+ .wireTap("direct:tap")
+ .to("mock:result");
+
+ from("direct:tap").routeId("bar")
+ .to("bean:tap");
+ }
+ };
+ }
+
+ public static class MyTapBean {
+
+ private String tapped;
+
+ public void tapSomething(String body) throws Exception {
+ try {
+ LATCH.await(5, TimeUnit.SECONDS);
+ Thread.sleep(100);
+ } catch (Exception e) {
+ fail("Should not be interrupted");
+ }
+ LOG.info("Wire tapping: {}", body);
+ tapped = body;
+ }
+
+ public String getTapped() {
+ return tapped;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/e8a287f4/camel-core/src/test/java/org/apache/camel/processor/WireTapVoidBeanTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/WireTapVoidBeanTest.java b/camel-core/src/test/java/org/apache/camel/processor/WireTapVoidBeanTest.java
index 71b75e6..d690c0c 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/WireTapVoidBeanTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/WireTapVoidBeanTest.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.