You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/07/20 22:47:25 UTC

[4/7] camel git commit: CAMEL-8965: Add mbean for wiretap

CAMEL-8965: Add mbean for wiretap


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4376cb3d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4376cb3d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4376cb3d

Branch: refs/heads/master
Commit: 4376cb3db8380bf8d21a60ec3910763a85b1e74a
Parents: b269ae5
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Jul 20 17:55:10 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Jul 20 22:54:08 2015 +0200

----------------------------------------------------------------------
 .../mbean/ManagedSendDynamicProcessorMBean.java |  2 +-
 .../management/mbean/ManagedWireTapMBean.java   | 35 ++++++++
 .../DefaultManagementObjectStrategy.java        |  8 +-
 .../mbean/ManagedWireTapProcessor.java          | 70 +++++++++++++++
 .../apache/camel/model/EnrichDefinition.java    |  4 +-
 .../camel/model/PollEnrichDefinition.java       |  2 +-
 .../apache/camel/model/WireTapDefinition.java   | 67 +++++++++++++++
 .../camel/processor/WireTapProcessor.java       | 23 +++++
 .../camel/management/ManagedWireTapTest.java    | 90 ++++++++++++++++++++
 9 files changed, 295 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java
index e1071ceb..9a6ae94 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedSendDynamicProcessorMBean.java
@@ -26,7 +26,7 @@ public interface ManagedSendDynamicProcessorMBean extends ManagedProcessorMBean
     @ManagedAttribute(description = "Message Exchange Pattern")
     String getMessageExchangePattern();
 
-    @ManagedAttribute(description = "Sets the maximum size used by the ProducerCacheN which is used to cache and reuse producers.")
+    @ManagedAttribute(description = "Sets the maximum size used by the ProducerCache which is used to cache and reuse producers")
     Integer getCacheSize();
 
     @ManagedAttribute(description = "Ignore the invalidate endpoint exception when try to create a producer with that endpoint")

http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
new file mode 100644
index 0000000..52663cc
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedWireTapMBean.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.api.management.mbean;
+
+import org.apache.camel.api.management.ManagedAttribute;
+
+public interface ManagedWireTapMBean extends ManagedProcessorMBean {
+
+    @ManagedAttribute(description = "Expression that returns the uri to use for the wire tap destination", mask = true)
+    String getExpression();
+
+    @ManagedAttribute(description = "Sets the maximum size used by the ProducerCache which is used to cache and reuse producers")
+    Integer getCacheSize();
+
+    @ManagedAttribute(description = "Ignore the invalidate endpoint exception when try to create a producer with that endpoint")
+    Boolean isIgnoreInvalidEndpoint();
+
+    @ManagedAttribute(description = "Uses a copy of the original exchange")
+    Boolean isCopy();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
index a8df7a9..d56f24c 100644
--- a/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
+++ b/camel-core/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java
@@ -38,7 +38,6 @@ import org.apache.camel.management.mbean.ManagedCamelContext;
 import org.apache.camel.management.mbean.ManagedComponent;
 import org.apache.camel.management.mbean.ManagedConsumer;
 import org.apache.camel.management.mbean.ManagedDelayer;
-import org.apache.camel.management.mbean.ManagedSendDynamicProcessor;
 import org.apache.camel.management.mbean.ManagedEndpoint;
 import org.apache.camel.management.mbean.ManagedErrorHandler;
 import org.apache.camel.management.mbean.ManagedEventNotifier;
@@ -47,20 +46,23 @@ import org.apache.camel.management.mbean.ManagedProcessor;
 import org.apache.camel.management.mbean.ManagedProducer;
 import org.apache.camel.management.mbean.ManagedRoute;
 import org.apache.camel.management.mbean.ManagedScheduledPollConsumer;
+import org.apache.camel.management.mbean.ManagedSendDynamicProcessor;
 import org.apache.camel.management.mbean.ManagedSendProcessor;
 import org.apache.camel.management.mbean.ManagedService;
 import org.apache.camel.management.mbean.ManagedSuspendableRoute;
 import org.apache.camel.management.mbean.ManagedThreadPool;
 import org.apache.camel.management.mbean.ManagedThrottler;
 import org.apache.camel.management.mbean.ManagedThroughputLogger;
+import org.apache.camel.management.mbean.ManagedWireTapProcessor;
 import org.apache.camel.model.ModelCamelContext;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.processor.Delayer;
-import org.apache.camel.processor.SendDynamicProcessor;
 import org.apache.camel.processor.ErrorHandler;
+import org.apache.camel.processor.SendDynamicProcessor;
 import org.apache.camel.processor.SendProcessor;
 import org.apache.camel.processor.Throttler;
 import org.apache.camel.processor.ThroughputLogger;
+import org.apache.camel.processor.WireTapProcessor;
 import org.apache.camel.processor.aggregate.AggregateProcessor;
 import org.apache.camel.processor.idempotent.IdempotentConsumer;
 import org.apache.camel.spi.BrowsableEndpoint;
@@ -183,6 +185,8 @@ public class DefaultManagementObjectStrategy implements ManagementObjectStrategy
                 answer = new ManagedDelayer(context, (Delayer) target, definition);
             } else if (target instanceof Throttler) {
                 answer = new ManagedThrottler(context, (Throttler) target, definition);
+            } else if (target instanceof WireTapProcessor) {
+                answer = new ManagedWireTapProcessor(context, (WireTapProcessor) target, definition);
             } else if (target instanceof SendDynamicProcessor) {
                 answer = new ManagedSendDynamicProcessor(context, (SendDynamicProcessor) target, definition);
             } else if (target instanceof SendProcessor) {

http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
new file mode 100644
index 0000000..60471fd
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedWireTapProcessor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management.mbean;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.api.management.mbean.ManagedWireTapMBean;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.WireTapProcessor;
+import org.apache.camel.spi.ManagementStrategy;
+import org.apache.camel.util.URISupport;
+
+/**
+ * @version 
+ */
+@ManagedResource(description = "Managed WireTapProcessor")
+public class ManagedWireTapProcessor extends ManagedProcessor implements ManagedWireTapMBean {
+    private final WireTapProcessor processor;
+    private String uri;
+
+    public ManagedWireTapProcessor(CamelContext context, WireTapProcessor processor, ProcessorDefinition<?> definition) {
+        super(context, processor, definition);
+        this.processor = processor;
+    }
+
+    public void init(ManagementStrategy strategy) {
+        super.init(strategy);
+        boolean sanitize = strategy.getManagementAgent().getMask() != null ? strategy.getManagementAgent().getMask() : false;
+        if (sanitize) {
+            uri = URISupport.sanitizeUri(processor.getExpression().toString());
+        } else {
+            uri = processor.getExpression().toString();
+        }
+    }
+
+    public WireTapProcessor getProcessor() {
+        return processor;
+    }
+
+    public String getExpression() {
+        return uri;
+    }
+
+    public Integer getCacheSize() {
+        return processor.getCacheSize();
+    }
+
+    public Boolean isIgnoreInvalidEndpoint() {
+        return processor.isIgnoreInvalidEndpoint();
+    }
+
+    public Boolean isCopy() {
+        return processor.isCopy();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
index 1d05f60..6143f17 100644
--- a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
@@ -182,8 +182,8 @@ public class EnrichDefinition extends NoOutputExpressionNode {
     }
 
     /**
-     * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used
-     * to cache and reuse consumers when using this pollEnrich, when uris are reused.
+     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+     * to cache and reuse producer when uris are reused.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
      * @return the builder

http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index f200051..365cc81 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -196,7 +196,7 @@ public class PollEnrichDefinition extends NoOutputExpressionNode {
 
     /**
      * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used
-     * to cache and reuse consumers when using this pollEnrich, when uris are reused.
+     * to cache and reuse consumers when uris are reused.
      *
      * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
      * @return the builder

http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
index 64e339e..22c4941 100644
--- a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
@@ -30,6 +30,7 @@ import javax.xml.bind.annotation.XmlTransient;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
+import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.CamelInternalProcessor;
 import org.apache.camel.processor.SendDynamicProcessor;
 import org.apache.camel.processor.WireTapProcessor;
@@ -59,9 +60,13 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
     @XmlAttribute @Metadata(defaultValue = "true")
     private Boolean copy;
     @XmlAttribute
+    private Integer cacheSize;
+    @XmlAttribute
     private String onPrepareRef;
     @XmlTransient
     private Processor onPrepare;
+    @XmlAttribute
+    private Boolean ignoreInvalidEndpoint;
 
     public WireTapDefinition() {
     }
@@ -75,6 +80,12 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
         // create the send dynamic producer to send to the wire tapped endpoint
         SendDynamicProcessor dynamicTo = new SendDynamicProcessor(getExpression());
         dynamicTo.setCamelContext(routeContext.getCamelContext());
+        if (cacheSize != null) {
+            dynamicTo.setCacheSize(cacheSize);
+        }
+        if (ignoreInvalidEndpoint != null) {
+            dynamicTo.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint);
+        }
 
         // create error handler we need to use for processing the wire tapped
         Processor target = wrapInErrorHandler(routeContext, dynamicTo);
@@ -109,6 +120,12 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
         if (onPrepare != null) {
             answer.setOnPrepare(onPrepare);
         }
+        if (cacheSize != null) {
+            answer.setCacheSize(cacheSize);
+        }
+        if (ignoreInvalidEndpoint != null) {
+            answer.setIgnoreInvalidEndpoint(ignoreInvalidEndpoint);
+        }
 
         return answer;
     }
@@ -261,6 +278,40 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
         return this;
     }
 
+    /**
+     * Sets the maximum size used by the {@link org.apache.camel.impl.ProducerCache} which is used
+     * to cache and reuse producers, when uris are reused.
+     *
+     * @param cacheSize  the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off.
+     * @return the builder
+     */
+    public WireTapDefinition cacheSize(int cacheSize) {
+        setCacheSize(cacheSize);
+        return this;
+    }
+
+    /**
+     * Ignore the invalidate endpoint exception when try to create a producer with that endpoint
+     *
+     * @return the builder
+     */
+    public WireTapDefinition ignoreInvalidEndpoint() {
+        setIgnoreInvalidEndpoint(true);
+        return this;
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+
+    /**
+     * Expression that returns the uri to use for the wire tap destination
+     */
+    @Override
+    public void setExpression(ExpressionDefinition expression) {
+        // override to include javadoc what the expression is used for
+        super.setExpression(expression);
+    }
+
     public Processor getNewExchangeProcessor() {
         return newExchangeProcessor;
     }
@@ -345,4 +396,20 @@ public class WireTapDefinition extends NoOutputExpressionNode implements Executo
     public void setHeaders(List<SetHeaderDefinition> headers) {
         this.headers = headers;
     }
+
+    public Integer getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(Integer cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
+    public Boolean getIgnoreInvalidEndpoint() {
+        return ignoreInvalidEndpoint;
+    }
+
+    public void setIgnoreInvalidEndpoint(Boolean ignoreInvalidEndpoint) {
+        this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 87a3365..fc09ea8 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -57,6 +57,9 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
     private final ExchangePattern exchangePattern;
     private final ExecutorService executorService;
     private volatile boolean shutdownExecutorService;
+    // only used for management to be able to report the setting
+    private int cacheSize;
+    private boolean ignoreInvalidEndpoint;
 
     // expression or processor used for populating a new exchange to send
     // as opposed to traditional wiretap that sends a copy of the original exchange
@@ -204,6 +207,10 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
         return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly);
     }
 
+    public Expression getExpression() {
+        return expression;
+    }
+
     public List<Processor> getNewExchangeProcessors() {
         return newExchangeProcessors;
     }
@@ -243,6 +250,22 @@ public class WireTapProcessor extends ServiceSupport implements AsyncProcessor,
         this.onPrepare = onPrepare;
     }
 
+    public int getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(int cacheSize) {
+        this.cacheSize = cacheSize;
+    }
+
+    public boolean isIgnoreInvalidEndpoint() {
+        return ignoreInvalidEndpoint;
+    }
+
+    public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) {
+        this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
+    }
+
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startService(processor);

http://git-wip-us.apache.org/repos/asf/camel/blob/4376cb3d/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java
new file mode 100644
index 0000000..31e0a37
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedWireTapTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.management;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.openmbean.TabularData;
+
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version 
+ */
+public class ManagedWireTapTest extends ManagementTestSupport {
+
+    public void testManageWireTap() throws Exception {
+        // JMX tests dont work well on AIX CI servers (hangs them)
+        if (isPlatform("aix")) {
+            return;
+        }
+
+        MockEndpoint foo = getMockEndpoint("mock:foo");
+        foo.expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:start", "Hello World", "whereto", "foo");
+
+        assertMockEndpointsSatisfied();
+
+        // get the stats for the route
+        MBeanServer mbeanServer = getMBeanServer();
+
+        // get the object name for the delayer
+        ObjectName on = ObjectName.getInstance("org.apache.camel:context=camel-1,type=processors,name=\"mysend\"");
+
+        // should be on route1
+        String routeId = (String) mbeanServer.getAttribute(on, "RouteId");
+        assertEquals("route1", routeId);
+
+        String camelId = (String) mbeanServer.getAttribute(on, "CamelId");
+        assertEquals("camel-1", camelId);
+
+        String state = (String) mbeanServer.getAttribute(on, "State");
+        assertEquals(ServiceStatus.Started.name(), state);
+
+        String uri = (String) mbeanServer.getAttribute(on, "Expression");
+        assertEquals("simple{direct:${header.whereto}}", uri);
+
+        TabularData data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{false}, new String[]{"boolean"});
+        assertNotNull(data);
+        assertEquals(2, data.size());
+
+        data = (TabularData) mbeanServer.invoke(on, "explain", new Object[]{true}, new String[]{"boolean"});
+        assertNotNull(data);
+        assertEquals(10, data.size());
+
+        String json = (String) mbeanServer.invoke(on, "informationJson", null, null);
+        assertNotNull(json);
+        assertTrue(json.contains("\"description\": \"Routes a copy of a message (or creates a new message) to a secondary destination while continue routing the original message"));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .wireTap("direct:${header.whereto}").id("mysend");
+
+                from("direct:foo").to("mock:foo");
+            }
+        };
+    }
+
+}