You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/08/12 16:55:14 UTC

git commit: CAMEL-6628: ProducerTemplate allows to turn off event notifier for sending/sent exchanges.

Updated Branches:
  refs/heads/master b96c92e56 -> 81d0a174b


CAMEL-6628: ProducerTemplate allows to turn off event notifier for sending/sent exchanges.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/81d0a174
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/81d0a174
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/81d0a174

Branch: refs/heads/master
Commit: 81d0a174b81ed2b8005327f48b9ffd6d3718c03a
Parents: b96c92e
Author: Claus Ibsen <da...@apache.org>
Authored: Mon Aug 12 16:54:43 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon Aug 12 16:54:43 2013 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/ProducerTemplate.java | 20 ++++++
 .../mbean/ManagedProducerCacheMBean.java        |  3 +
 .../camel/impl/DefaultProducerTemplate.java     | 14 +++++
 .../org/apache/camel/impl/ProducerCache.java    | 34 +++++++---
 .../management/mbean/ManagedProducerCache.java  |  3 +
 .../camel/support/EventNotifierSupport.java     |  8 +++
 .../camel/processor/MySentEventNotifier.java    | 49 +++++++++++++++
 ...roducerTemplateDisableEventNotifierTest.java | 65 ++++++++++++++++++++
 8 files changed, 186 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
index aedb7d3..32533c1 100644
--- a/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
+++ b/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
@@ -112,6 +112,26 @@ public interface ProducerTemplate extends Service {
      */
     void setDefaultEndpointUri(String endpointUri);
 
+    /**
+     * Sets whether the {@link org.apache.camel.spi.EventNotifier} should be
+     * used by this {@link ProducerTemplate} to send events about the {@link Exchange}
+     * being sent.
+     * <p/>
+     * By default this is enabled.
+     *
+     * @param enabled <tt>true</tt> to enable, <tt>false</tt> to disable.
+     */
+    void setEventNotifierEnabled(boolean enabled);
+
+    /**
+     * Whether the {@link org.apache.camel.spi.EventNotifier} should be
+     * used by this {@link ProducerTemplate} to send events about the {@link Exchange}
+     * being sent.
+     *
+     * @return <tt>true</tt> if enabled, <tt>false</tt> otherwise
+     */
+    boolean isEventNotifierEnabled();
+
     // Synchronous methods
     // -----------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerCacheMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerCacheMBean.java b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerCacheMBean.java
index 6758615..aeb9474 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerCacheMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedProducerCacheMBean.java
@@ -45,4 +45,7 @@ public interface ManagedProducerCacheMBean extends ManagedServiceMBean {
     @ManagedOperation(description = "Purges the cache")
     void purge();
 
+    @ManagedAttribute(description = "EventNotifier enabled")
+    Boolean isEventNotifierEnabled();
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
index bce26e8..7f852f7 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
@@ -52,6 +52,7 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT
     private volatile ExecutorService executor;
     private Endpoint defaultEndpoint;
     private int maximumCacheSize;
+    private boolean eventNotifierEnabled = true;
 
     public DefaultProducerTemplate(CamelContext camelContext) {
         this.camelContext = camelContext;
@@ -87,6 +88,18 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT
         return producerCache.size();
     }
 
+    public boolean isEventNotifierEnabled() {
+        return eventNotifierEnabled;
+    }
+
+    public void setEventNotifierEnabled(boolean eventNotifierEnabled) {
+        this.eventNotifierEnabled = eventNotifierEnabled;
+        // if we already created the cache then adjust its setting as well
+        if (producerCache != null) {
+            producerCache.setEventNotifierEnabled(eventNotifierEnabled);
+        }
+    }
+
     public Exchange send(String endpointUri, Exchange exchange) {
         Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
         return send(endpoint, exchange);
@@ -717,6 +730,7 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT
             } else {
                 producerCache = new ProducerCache(this, camelContext);
             }
+            producerCache.setEventNotifierEnabled(isEventNotifierEnabled());
         }
         ServiceHelper.startService(producerCache);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
index 8c5f976..99023d0 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
@@ -54,6 +54,7 @@ public class ProducerCache extends ServiceSupport {
     private final ServicePool<Endpoint, Producer> pool;
     private final Map<String, Producer> producers;
     private final Object source;
+    private boolean eventNotifierEnabled = true;
 
     public ProducerCache(Object source, CamelContext camelContext) {
         this(source, camelContext, CamelContextHelper.getMaximumCachePoolSize(camelContext));
@@ -74,6 +75,14 @@ public class ProducerCache extends ServiceSupport {
         this.producers = cache;
     }
 
+    public boolean isEventNotifierEnabled() {
+        return eventNotifierEnabled;
+    }
+
+    public void setEventNotifierEnabled(boolean eventNotifierEnabled) {
+        this.eventNotifierEnabled = eventNotifierEnabled;
+    }
+
     /**
      * Creates the {@link LRUCache} to be used.
      * <p/>
@@ -220,13 +229,13 @@ public class ProducerCache extends ServiceSupport {
         }
 
         StopWatch watch = null;
-        if (exchange != null) {
+        if (eventNotifierEnabled && exchange != null) {
             // record timing for sending the exchange using the producer
             watch = new StopWatch();
         }
 
         try {
-            if (exchange != null) {
+            if (eventNotifierEnabled && exchange != null) {
                 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
             }
             // invoke the callback
@@ -236,7 +245,7 @@ public class ProducerCache extends ServiceSupport {
                 exchange.setException(e);
             }
         } finally {
-            if (exchange != null) {
+            if (eventNotifierEnabled && exchange != null) {
                 long timeTaken = watch.stop();
                 // emit event that the exchange was sent to the endpoint
                 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
@@ -287,10 +296,10 @@ public class ProducerCache extends ServiceSupport {
         }
 
         // record timing for sending the exchange using the producer
-        final StopWatch watch = exchange != null ? new StopWatch() : null;
+        final StopWatch watch = eventNotifierEnabled && exchange != null ? new StopWatch() : null;
 
         try {
-            if (exchange != null) {
+            if (eventNotifierEnabled && exchange != null) {
                 EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
             }
             // invoke the callback
@@ -299,7 +308,7 @@ public class ProducerCache extends ServiceSupport {
                 @Override
                 public void done(boolean doneSync) {
                     try {
-                        if (watch != null) {
+                        if (eventNotifierEnabled && watch != null) {
                             long timeTaken = watch.stop();
                             // emit event that the exchange was sent to the endpoint
                             EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
@@ -358,9 +367,12 @@ public class ProducerCache extends ServiceSupport {
                 exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
 
                 // send the exchange using the processor
-                StopWatch watch = new StopWatch();
+                StopWatch watch = null;
                 try {
-                    EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
+                    if (eventNotifierEnabled) {
+                        watch = new StopWatch();
+                        EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
+                    }
                     // ensure we run in an unit of work
                     Producer target = new UnitOfWorkProducer(producer);
                     target.process(exchange);
@@ -369,8 +381,10 @@ public class ProducerCache extends ServiceSupport {
                     exchange.setException(e);
                 } finally {
                     // emit event that the exchange was sent to the endpoint
-                    long timeTaken = watch.stop();
-                    EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
+                    if (eventNotifierEnabled && watch != null) {
+                        long timeTaken = watch.stop();
+                        EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken);
+                    }
                 }
                 return exchange;
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
index 5cb57b0..ef07077 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedProducerCache.java
@@ -72,4 +72,7 @@ public class ManagedProducerCache extends ManagedService implements ManagedProdu
         producerCache.purge();
     }
 
+    public Boolean isEventNotifierEnabled() {
+        return producerCache.isEventNotifierEnabled();
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/main/java/org/apache/camel/support/EventNotifierSupport.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/support/EventNotifierSupport.java b/camel-core/src/main/java/org/apache/camel/support/EventNotifierSupport.java
index 2bc2c08..68dcc81 100644
--- a/camel-core/src/main/java/org/apache/camel/support/EventNotifierSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/support/EventNotifierSupport.java
@@ -117,4 +117,12 @@ public abstract class EventNotifierSupport extends ServiceSupport implements Eve
     public void setIgnoreExchangeSendingEvents(boolean ignoreExchangeSendingEvents) {
         this.ignoreExchangeSendingEvents = ignoreExchangeSendingEvents;
     }
+
+    protected void doStart() throws Exception {
+        // noop
+    }
+
+    protected void doStop() throws Exception {
+        // noop
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/test/java/org/apache/camel/processor/MySentEventNotifier.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/MySentEventNotifier.java b/camel-core/src/test/java/org/apache/camel/processor/MySentEventNotifier.java
new file mode 100644
index 0000000..117c104
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/MySentEventNotifier.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.EventObject;
+import java.util.List;
+
+import org.apache.camel.management.event.ExchangeSentEvent;
+import org.apache.camel.support.EventNotifierSupport;
+
+/**
+ * @version 
+ */
+public class MySentEventNotifier extends EventNotifierSupport {
+
+    private final List<ExchangeSentEvent> events = new ArrayList<ExchangeSentEvent>();
+
+    public List<ExchangeSentEvent> getEvents() {
+        return events;
+    }
+
+    public void notify(EventObject event) throws Exception {
+        if (event instanceof ExchangeSentEvent) {
+            ExchangeSentEvent sent = (ExchangeSentEvent) event;
+            events.add(sent);
+        }
+    }
+
+    public boolean isEnabled(EventObject event) {
+        // we only want the sent events
+        return event instanceof ExchangeSentEvent;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/81d0a174/camel-core/src/test/java/org/apache/camel/processor/ProducerTemplateDisableEventNotifierTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ProducerTemplateDisableEventNotifierTest.java b/camel-core/src/test/java/org/apache/camel/processor/ProducerTemplateDisableEventNotifierTest.java
new file mode 100644
index 0000000..ad9855c
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ProducerTemplateDisableEventNotifierTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+
+/**
+ * @version 
+ */
+public class ProducerTemplateDisableEventNotifierTest extends ContextTestSupport {
+
+    private MySentEventNotifier notifier = new MySentEventNotifier();
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        DefaultCamelContext context = (DefaultCamelContext) super.createCamelContext();
+        context.getManagementStrategy().addEventNotifier(notifier);
+        return context;
+    }
+
+    public void testExchangeSent() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        // we dont want events for producer template itself so disable that
+        ProducerTemplate other = context.createProducerTemplate();
+        other.setEventNotifierEnabled(false);
+
+        other.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(2, notifier.getEvents().size());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("direct:bar").to("mock:result");
+
+                from("direct:bar").delay(1);
+            }
+        };
+    }
+
+}
\ No newline at end of file