You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/20 22:47:25 UTC
[4/7] camel git commit: CAMEL-8965: Add mbean for wiretap
CAMEL-8965: Add mbean for wiretap
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4376cb3d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4376cb3d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4376cb3d
Branch: refs/heads/master
Commit: 4376cb3db8380bf8d21a60ec3910763a85b1e74a
Parents: b269ae5
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 20 17:55:10 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 20 22:54:08 2015 +0200
----------------------------------------------------------------------
.../mbean/ManagedSendDynamicProcessorMBean.java | 2 +-
.../management/mbean/ManagedWireTapMBean.java | 35 ++++++++
.../DefaultManagementObjectStrategy.java | 8 +-
.../mbean/ManagedWireTapProcessor.java | 70 +++++++++++++++
.../apache/camel/model/EnrichDefinition.java | 4 +-
.../camel/model/PollEnrichDefinition.java | 2 +-
.../apache/camel/model/WireTapDefinition.java | 67 +++++++++++++++
.../camel/processor/WireTapProcessor.java | 23 +++++
.../camel/management/ManagedWireTapTest.java | 90 ++++++++++++++++++++
9 files changed, 295 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java
index e1071ceb..9a6ae94 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java
@@ -26,7 +26,7 @@ public interface ManagedSendDynamicProcessorMBean extends ManagedProcessorMBean
@ManagedAttribute(description = "Message Exchange Pattern")
String getMessageExchangePattern();
- @ManagedAttribute(description = "Sets the maximum size used by the ProducerCacheN which is used to cache and reuse producers.")
+ @ManagedAttribute(description = "Sets the maximum size used by the ProducerCache which is used to cache and reuse producers")
Integer getCacheSize();
@ManagedAttribute(description = "Ignore the invalidate endpoint exception when try to create a producer with that endpoint")
http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
new file mode 100644
index 0000000..52663cc
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import org.apache.camel.api.management.ManagedAttribute;
+
+public interface ManagedWireTapMBean extends ManagedProcessorMBean {
+
+ @ManagedAttribute(description = "Expression that returns the uri to use for the wire tap destination", mask = true)
+ String getExpression();
+
+ @ManagedAttribute(description = "Sets the maximum size used by the ProducerCache which is used to cache and reuse producers")
+ Integer getCacheSize();
+
+ @ManagedAttribute(description = "Ignore the invalidate endpoint exception when try to create a producer with that endpoint")
+ Boolean isIgnoreInvalidEndpoint();
+
+ @ManagedAttribute(description = "Uses a copy of the original exchange")
+ Boolean isCopy();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
index a8df7a9..d56f24c 100644
--- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
@@ -38,7 +38,6 @@ import org.apache.camel.management.mbean.ManagedCamelContext;
import org.apache.camel.management.mbean.ManagedComponent;
import org.apache.camel.management.mbean.ManagedConsumer;
import org.apache.camel.management.mbean.ManagedDelayer;
-import org.apache.camel.management.mbean.ManagedSendDynamicProcessor;
import org.apache.camel.management.mbean.ManagedEndpoint;
import org.apache.camel.management.mbean.ManagedErrorHandler;
import org.apache.camel.management.mbean.ManagedEventNotifier;
@@ -47,20 +46,23 @@ import org.apache.camel.management.mbean.ManagedProcessor;
import org.apache.camel.management.mbean.ManagedProducer;
import org.apache.camel.management.mbean.ManagedRoute;
import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
+import org.apache.camel.management.mbean.ManagedSendDynamicProcessor;
import org.apache.camel.management.mbean.ManagedSendProcessor;
import org.apache.camel.management.mbean.ManagedService;
import org.apache.camel.management.mbean.ManagedSuspendableRoute;
import org.apache.camel.management.mbean.ManagedThreadPool;
import org.apache.camel.management.mbean.ManagedThrottler;
import org.apache.camel.management.mbean.ManagedThroughputLogger;
+import org.apache.camel.management.mbean.ManagedWireTapProcessor;
import org.apache.camel.model.ModelCamelContext;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.processor.Delayer;
-import org.apache.camel.processor.SendDynamicProcessor;
import org.apache.camel.processor.ErrorHandler;
+import org.apache.camel.processor.SendDynamicProcessor;
import org.apache.camel.processor.SendProcessor;
import org.apache.camel.processor.Throttler;
import org.apache.camel.processor.ThroughputLogger;
+import org.apache.camel.processor.WireTapProcessor;
import org.apache.camel.processor.aggregate.AggregateProcessor;
import org.apache.camel.processor.idempotent.IdempotentConsumer;
import org.apache.camel.spi.BrowsableEndpoint;
@@ -183,6 +185,8 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy
answer = new ManagedDelayer(context, (Delayer) target, definition);
} else if (target instanceof Throttler) {
answer = new ManagedThrottler(context, (Throttler) target, definition);
+ } else if (target instanceof WireTapProcessor) {
+ answer = new ManagedWireTapProcessor(context, (WireTapProcessor) target, definition);
} else if (target instanceof SendDynamicProcessor) {
answer = new ManagedSendDynamicProcessor(context, (SendDynamicProcessor) target, definition);
} else if (target instanceof SendProcessor) {
http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
new file mode 100644
index 0000000..60471fd
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.ManagedWireTapMBean;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.WireTapProcessor;
+import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.util.URISupport;
+
+/**
+ * @version
+ */
+@ManagedResource(description = "Managed WireTapProcessor")
+public class ManagedWireTapProcessor extends ManagedProcessor implements ManagedWireTapMBean {
+ private final WireTapProcessor processor;
+ private String uri;
+
+ public ManagedWireTapProcessor(CamelContext context, WireTapProcessor processor, ProcessorDefinition<?> definition) {
+ super(context, processor, definition);
+ this.processor = processor;
+ }
+
+ public void init(ManagementStrategy strategy) {
+ super.init(strategy);
+ boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false;
+ if (sanitize) {
+ uri = URISupport.sanitizeUri(processor.getExpression().toString());
+ } else {
+ uri = processor.getExpression().toString();
+ }
+ }
+
+ public WireTapProcessor getProcessor() {
+ return processor;
+ }
+
+ public String getExpression() {
+ return uri;
+ }
+
+ public Integer getCacheSize() {
+ return processor.getCacheSize();
+ }
+
+ public Boolean isIgnoreInvalidEndpoint() {
+ return processor.isIgnoreInvalidEndpoint();
+ }
+
+ public Boolean isCopy() {
+ return processor.isCopy();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
index 1d05f60..6143f17 100644
--- a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
@@ -182,8 +182,8 @@ public class EnrichDefinition extends NoOutputExpressionNode {
}
/**
- * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used
- * to cache and reuse consumers when using this pollEnrich, when uris are reused.
+ * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+ * to cache and reuse producer when uris are reused.
*
* @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
* @return the builder
http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index f200051..365cc81 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -196,7 +196,7 @@ public class PollEnrichDefinition extends NoOutputExpressionNode {
/**
* Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used
- * to cache and reuse consumers when using this pollEnrich, when uris are reused.
+ * to cache and reuse consumers when uris are reused.
*
* @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
* @return the builder
http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
index 64e339e..22c4941 100644
--- a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
@@ -30,6 +30,7 @@ import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
+import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.processor.CamelInternalProcessor;
import org.apache.camel.processor.SendDynamicProcessor;
import org.apache.camel.processor.WireTapProcessor;
@@ -59,9 +60,13 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
@XmlAttribute @Metadata(defaultValue = "true")
private Boolean copy;
@XmlAttribute
+ private Integer cacheSize;
+ @XmlAttribute
private String onPrepareRef;
@XmlTransient
private Processor onPrepare;
+ @XmlAttribute
+ private Boolean ignoreInvalidEndpoint;
public WireTapDefinition() {
}
@@ -75,6 +80,12 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
// create the send dynamic producer to send to the wire tapped endpoint
SendDynamicProcessor dynamicTo = new SendDynamicProcessor(getExpression());
dynamicTo.setCamelContext(routeContext.getCamelContext());
+ if (cacheSize != null) {
+ dynamicTo.setCacheSize(cacheSize);
+ }
+ if (ignoreInvalidEndpoint != null) {
+ dynamicTo.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint);
+ }
// create error handler we need to use for processing the wire tapped
Processor target = wrapInErrorHandler(routeContext, dynamicTo);
@@ -109,6 +120,12 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
if (onPrepare != null) {
answer.setOnPrepare(onPrepare);
}
+ if (cacheSize != null) {
+ answer.setCacheSize(cacheSize);
+ }
+ if (ignoreInvalidEndpoint != null) {
+ answer.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint);
+ }
return answer;
}
@@ -261,6 +278,40 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
return this;
}
+ /**
+ * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+ * to cache and reuse producers, when uris are reused.
+ *
+ * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+ * @return the builder
+ */
+ public WireTapDefinition cacheSize(int cacheSize) {
+ setCacheSize(cacheSize);
+ return this;
+ }
+
+ /**
+ * Ignore the invalidate endpoint exception when try to create a producer with that endpoint
+ *
+ * @return the builder
+ */
+ public WireTapDefinition ignoreInvalidEndpoint() {
+ setIgnoreInvalidEndpoint(true);
+ return this;
+ }
+
+ // Properties
+ //-------------------------------------------------------------------------
+
+ /**
+ * Expression that returns the uri to use for the wire tap destination
+ */
+ @Override
+ public void setExpression(ExpressionDefinition expression) {
+ // override to include javadoc what the expression is used for
+ super.setExpression(expression);
+ }
+
public Processor getNewExchangeProcessor() {
return newExchangeProcessor;
}
@@ -345,4 +396,20 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
public void setHeaders(List<SetHeaderDefinition> headers) {
this.headers = headers;
}
+
+ public Integer getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(Integer cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
+ public Boolean getIgnoreInvalidEndpoint() {
+ return ignoreInvalidEndpoint;
+ }
+
+ public void setIgnoreInvalidEndpoint(Boolean ignoreInvalidEndpoint) {
+ this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 87a3365..fc09ea8 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -57,6 +57,9 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
private final ExchangePattern exchangePattern;
private final ExecutorService executorService;
private volatile boolean shutdownExecutorService;
+ // only used for management to be able to report the setting
+ private int cacheSize;
+ private boolean ignoreInvalidEndpoint;
// expression or processor used for populating a new exchange to send
// as opposed to traditional wiretap that sends a copy of the original exchange
@@ -204,6 +207,10 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly);
}
+ public Expression getExpression() {
+ return expression;
+ }
+
public List<Processor> getNewExchangeProcessors() {
return newExchangeProcessors;
}
@@ -243,6 +250,22 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
this.onPrepare = onPrepare;
}
+ public int getCacheSize() {
+ return cacheSize;
+ }
+
+ public void setCacheSize(int cacheSize) {
+ this.cacheSize = cacheSize;
+ }
+
+ public boolean isIgnoreInvalidEndpoint() {
+ return ignoreInvalidEndpoint;
+ }
+
+ public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) {
+ this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
+ }
+
@Override
protected void doStart() throws Exception {
ServiceHelper.startService(processor);
http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java
new file mode 100644
index 0000000..31e0a37
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.openmbean.TabularData;
+
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version
+ */
+public class ManagedWireTapTest extends ManagementTestSupport {
+
+ public void testManageWireTap() throws Exception {
+ // JMX tests dont work well on AIX CI servers (hangs them)
+ if (isPlatform("aix")) {
+ return;
+ }
+
+ MockEndpoint foo = getMockEndpoint("mock:foo");
+ foo.expectedMessageCount(1);
+
+ template.sendBodyAndHeader("direct:start", "Hello World", "whereto", "foo");
+
+ assertMockEndpointsSatisfied();
+
+ // get the stats for the route
+ MBeanServer mbeanServer = getMBeanServer();
+
+ // get the object name for the delayer
+ ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"mysend\"");
+
+ // should be on route1
+ String routeId = (String) mbeanServer.getAttribute(on, "RouteId");
+ assertEquals("route1", routeId);
+
+ String camelId = (String) mbeanServer.getAttribute(on, "CamelId");
+ assertEquals("camel-1", camelId);
+
+ String state = (String) mbeanServer.getAttribute(on, "State");
+ assertEquals(ServiceStatus.Started.name(), state);
+
+ String uri = (String) mbeanServer.getAttribute(on, "Expression");
+ assertEquals("simple{direct:${header.whereto}}", uri);
+
+ TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"});
+ assertNotNull(data);
+ assertEquals(2, data.size());
+
+ data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"});
+ assertNotNull(data);
+ assertEquals(10, data.size());
+
+ String json = (String) mbeanServer.invoke(on, "informationJson", null, null);
+ assertNotNull(json);
+ assertTrue(json.contains("\"description\": \"Routes a copy of a message (or creates a new message) to a secondary destination while continue routing the original message"));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .wireTap("direct:${header.whereto}").id("mysend");
+
+ from("direct:foo").to("mock:foo");
+ }
+ };
+ }
+
+}