You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2014/08/14 08:55:32 UTC

[1/3] git commit: CAMEL-7696: camel-metrics - Add a route policy to expose route stats as codehale metrics. Work in progress.

Repository: camel
Updated Branches:
  refs/heads/master b2c3b31bb -> c672d44b4


CAMEL-7696: camel-metrics - Add a route policy to expose route stats as codehale metrics. Work in progress.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c672d44b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c672d44b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c672d44b

Branch: refs/heads/master
Commit: c672d44b4f46a9a128216759059b134fc6f793ab
Parents: fa7e225
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Aug 14 08:55:11 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Aug 14 08:55:22 2014 +0200

----------------------------------------------------------------------
 .../routepolicy/MetricsRegistryService.java     |  64 ++++++++++
 .../metrics/routepolicy/MetricsRoutePolicy.java | 124 +++++++++++++++++++
 .../routepolicy/MetricsRoutePolicyTest.java     |  65 ++++++++++
 3 files changed, 253 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c672d44b/components/camel-metrics/src/main/java/org/apache/camel/component/metrics/routepolicy/MetricsRegistryService.java
----------------------------------------------------------------------
diff --git a/components/camel-metrics/src/main/java/org/apache/camel/component/metrics/routepolicy/MetricsRegistryService.java b/components/camel-metrics/src/main/java/org/apache/camel/component/metrics/routepolicy/MetricsRegistryService.java
new file mode 100644
index 0000000..efa3663
--- /dev/null
+++ b/components/camel-metrics/src/main/java/org/apache/camel/component/metrics/routepolicy/MetricsRegistryService.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.metrics.routepolicy;
+
+import javax.management.MBeanServer;
+
+import com.codahale.metrics.JmxReporter;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.support.ServiceSupport;
+
+public final class MetricsRegistryService extends ServiceSupport implements CamelContextAware {
+
+    private CamelContext camelContext;
+    private MetricRegistry registry;
+    private JmxReporter reporter;
+
+    public MetricRegistry getRegistry() {
+        return registry;
+    }
+
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        registry = new MetricRegistry();
+
+        MBeanServer server = getCamelContext().getManagementStrategy().getManagementAgent().getMBeanServer();
+        if (server != null) {
+            String domain = "org.apache.camel.metrics." + getCamelContext().getManagementName();
+            reporter = JmxReporter.forRegistry(registry).registerWith(server).inDomain(domain).build();
+            reporter.start();
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (reporter != null) {
+            reporter.stop();
+            reporter = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c672d44b/components/camel-metrics/src/main/java/org/apache/camel/component/metrics/routepolicy/MetricsRoutePolicy.java
----------------------------------------------------------------------
diff --git a/components/camel-metrics/src/main/java/org/apache/camel/component/metrics/routepolicy/MetricsRoutePolicy.java b/components/camel-metrics/src/main/java/org/apache/camel/component/metrics/routepolicy/MetricsRoutePolicy.java
new file mode 100644
index 0000000..f194ccb
--- /dev/null
+++ b/components/camel-metrics/src/main/java/org/apache/camel/component/metrics/routepolicy/MetricsRoutePolicy.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.metrics.routepolicy;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.impl.RoutePolicySupport;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * A {@link org.apache.camel.spi.RoutePolicy} which gathers statistics and reports them using {@link com.codahale.metrics.MetricRegistry}.
+ * <p/>
+ * The metrics is reported in JMX by default, but this can be configured.
+ */
+public class MetricsRoutePolicy extends RoutePolicySupport {
+
+    // TODO: allow to configure which counters/meters/timers to capture
+    // TODO: allow to configur the reporer and jmx domain etc on MetricsRegistryService
+    // TODO: RoutePolicyFactory to make this configurable once and apply automatic for all routes
+    // TODO: allow to lookup and get hold of com.codahale.metrics.MetricRegistry from java api
+
+    private MetricsRegistryService registry;
+    private final ConcurrentMap<Route, MetricsStatistics> statistics = new ConcurrentHashMap<Route, MetricsStatistics>();
+    private Route route;
+
+    private final static class MetricsStatistics {
+        private Counter total;
+        private Counter inflight;
+        private Meter requests;
+        private Timer responses;
+
+        private MetricsStatistics(Counter total, Counter inflight, Meter requests, Timer responses) {
+            this.total = total;
+            this.inflight = inflight;
+            this.requests = requests;
+            this.responses = responses;
+        }
+
+        public void onExchangeBegin(Exchange exchange) {
+            total.inc();
+            inflight.inc();
+            requests.mark();
+
+            Timer.Context context = responses.time();
+            exchange.setProperty("MetricsRoutePolicy", context);
+        }
+
+        public void onExchangeDone(Exchange exchange) {
+            inflight.dec();
+
+            Timer.Context context = exchange.getProperty("MetricsRoutePolicy", Timer.Context.class);
+            if (context != null) {
+                context.stop();
+            }
+        }
+    }
+
+    @Override
+    public void onInit(Route route) {
+        super.onInit(route);
+
+        this.route = route;
+        try {
+            registry = route.getRouteContext().getCamelContext().hasServiceByType(MetricsRegistryService.class);
+            if (registry == null) {
+                registry = new MetricsRegistryService();
+                route.getRouteContext().getCamelContext().addService(registry);
+            }
+        } catch (Exception e) {
+            throw ObjectHelper.wrapRuntimeCamelException(e);
+        }
+
+        MetricsStatistics stats = statistics.get(route);
+        if (stats == null) {
+            Counter total = registry.getRegistry().counter(createName("total"));
+            Counter inflight = registry.getRegistry().counter(createName("inflight"));
+            Meter requests = registry.getRegistry().meter(createName("requests"));
+            Timer responses = registry.getRegistry().timer(createName("responses"));
+            stats = new MetricsStatistics(total, inflight, requests, responses);
+            statistics.putIfAbsent(route, stats);
+        }
+    }
+
+    private String createName(String type) {
+        return route.getRouteContext().getCamelContext().getManagementName() + "-" + route.getId() + "-" + type;
+    }
+
+    @Override
+    public void onExchangeBegin(Route route, Exchange exchange) {
+        MetricsStatistics stats = statistics.get(route);
+        if (stats != null) {
+            stats.onExchangeBegin(exchange);
+        }
+    }
+
+    @Override
+    public void onExchangeDone(Route route, Exchange exchange) {
+        MetricsStatistics stats = statistics.get(route);
+        if (stats != null) {
+            stats.onExchangeDone(exchange);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/c672d44b/components/camel-metrics/src/test/java/org/apache/camel/component/metrics/routepolicy/MetricsRoutePolicyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-metrics/src/test/java/org/apache/camel/component/metrics/routepolicy/MetricsRoutePolicyTest.java b/components/camel-metrics/src/test/java/org/apache/camel/component/metrics/routepolicy/MetricsRoutePolicyTest.java
new file mode 100644
index 0000000..8903131
--- /dev/null
+++ b/components/camel-metrics/src/test/java/org/apache/camel/component/metrics/routepolicy/MetricsRoutePolicyTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.metrics.routepolicy;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.RoutePolicy;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class MetricsRoutePolicyTest extends CamelTestSupport {
+
+    @Override
+    protected boolean useJmx() {
+        return true;
+    }
+
+    @Test
+    public void testMetricsRoutePolicy() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(10);
+
+        for (int i = 0; i < 50; i++) {
+            if (i % 2 == 0) {
+                template.sendBody("seda:foo", "Hello " + i);
+            } else {
+                template.sendBody("seda:bar", "Hello " + i);
+            }
+        }
+
+        assertMockEndpointsSatisfied();
+
+        // TODO: assert the jmx mbeans
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                RoutePolicy policy = new MetricsRoutePolicy();
+
+                from("seda:foo").routeId("foo").routePolicy(policy)
+                    .delayer(100)
+                    .to("mock:result");
+
+                from("seda:bar").routeId("bar").routePolicy(policy)
+                    .delayer(250)
+                    .to("mock:result");
+            }
+        };
+    }
+}


[3/3] git commit: Polished

Posted by da...@apache.org.
Polished


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/76c39dcb
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/76c39dcb
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/76c39dcb

Branch: refs/heads/master
Commit: 76c39dcbe1226be9ed995f7a807702cb965e1208
Parents: b2c3b31
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Aug 13 16:16:34 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Aug 14 08:55:22 2014 +0200

----------------------------------------------------------------------
 .../apache/camel/processor/binding/RestBindingProcessor.java    | 5 -----
 1 file changed, 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/76c39dcb/camel-core/src/main/java/org/apache/camel/processor/binding/RestBindingProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/binding/RestBindingProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/binding/RestBindingProcessor.java
index ec1af34..429c005 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/binding/RestBindingProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/binding/RestBindingProcessor.java
@@ -41,11 +41,6 @@ import org.apache.camel.util.ObjectHelper;
  */
 public class RestBindingProcessor extends ServiceSupport implements AsyncProcessor {
 
-    // TODO: consumes/produces can be a list of media types, and prioritized 1st to last. (eg the q=weight option)
-    // TODO: use content-type from produces/consumes if possible to set as Content-Type if missing
-
-    // text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
-
     private final AsyncProcessor jsonUnmarshal;
     private final AsyncProcessor xmlUnmarshal;
     private final AsyncProcessor jsonMarshal;


[2/3] git commit: CAMEL-7695: CamelContext - Allow to check if a service by its type has been added

Posted by da...@apache.org.
CAMEL-7695: CamelContext - Allow to check if a service by its type has been added


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/fa7e2254
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/fa7e2254
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/fa7e2254

Branch: refs/heads/master
Commit: fa7e2254f065f3e3e7bf28d73ca218c914676487
Parents: 76c39dc
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Aug 14 08:52:51 2014 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Aug 14 08:55:22 2014 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/camel/CamelContext.java   |  8 ++++++++
 .../org/apache/camel/impl/DefaultCamelContext.java     | 13 ++++++++++++-
 .../org/apache/camel/impl/DefaultCamelContextTest.java | 13 +++++++++++++
 3 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/fa7e2254/camel-core/src/main/java/org/apache/camel/CamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java
index 5b175a3..8a3c31b 100644
--- a/camel-core/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java
@@ -230,6 +230,14 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
     boolean hasService(Object object);
 
     /**
+     * Has the given service type already been added to this context?
+     *
+     * @param type the class type
+     * @return the service instance or <tt>null</tt> if not already added.
+     */
+    public <T> T hasService(Class<T> type);
+
+    /**
      * Adds the given listener to be invoked when {@link CamelContext} have just been started.
      * <p/>
      * This allows listeners to do any custom work after the routes and other services have been started and are running.

http://git-wip-us.apache.org/repos/asf/camel/blob/fa7e2254/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index dab1eaa..72defeb 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -66,6 +66,7 @@ import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.StartupListener;
 import org.apache.camel.StatefulService;
 import org.apache.camel.SuspendableService;
+import org.apache.camel.TypeConversionException;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.VetoCamelContextStartException;
 import org.apache.camel.builder.ErrorHandlerBuilder;
@@ -161,7 +162,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
     private final List<EndpointStrategy> endpointStrategies = new ArrayList<EndpointStrategy>();
     private final Map<String, Component> components = new HashMap<String, Component>();
     private final Set<Route> routes = new LinkedHashSet<Route>();
-    private final List<Service> servicesToClose = new ArrayList<Service>();
+    private final List<Service> servicesToClose = new CopyOnWriteArrayList<Service>();
     private final Set<StartupListener> startupListeners = new LinkedHashSet<StartupListener>();
     private TypeConverter typeConverter;
     private TypeConverterRegistry typeConverterRegistry;
@@ -1031,6 +1032,16 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
         return false;
     }
 
+    @Override
+    public <T> T hasService(Class<T> type) {
+        for (Service service : servicesToClose) {
+            if (type.isInstance(service)) {
+                return type.cast(service);
+            }
+        }
+        return null;
+    }
+
     public void addStartupListener(StartupListener listener) throws Exception {
         // either add to listener so we can invoke then later when CamelContext has been started
         // or invoke the callback right now

http://git-wip-us.apache.org/repos/asf/camel/blob/fa7e2254/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextTest.java
index b7f51ae..3403ae4 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextTest.java
@@ -335,6 +335,19 @@ public class DefaultCamelContextTest extends TestSupport {
         assertEquals("Stopped", my.getStatus().name());
     }
 
+    public void testAddServiceType() throws Exception {
+        MyService my = new MyService();
+
+        DefaultCamelContext ctx = new DefaultCamelContext();
+        assertNull(ctx.hasService(MyService.class));
+
+        ctx.addService(my);
+        assertSame(my, ctx.hasService(MyService.class));
+
+        ctx.stop();
+        assertNull(ctx.hasService(MyService.class));
+    }
+
     private static class MyService extends ServiceSupport implements CamelContextAware {
 
         private CamelContext camelContext;