You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/02/14 11:36:34 UTC

[camel] branch master updated: CAMEL-13199: Using @Produce as proxy for sending to endpoint does not emit sending/sent events. Also use async processing in unit of work producer.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new c07baef  CAMEL-13199: Using @Produce as proxy for sending to endpoint does not emit sending/sent events. Also use async processing in unit of work producer.
c07baef is described below

commit c07baeff3795d7ff4c0426b97e5dba5f9da15934
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Feb 14 11:37:20 2019 +0100

    CAMEL-13199: Using @Produce as proxy for sending to endpoint does not emit sending/sent events. Also use async processing in unit of work producer.
---
 .../apache/camel/component/bean/ProxyHelper.java   |   3 +-
 .../camel/impl/CamelPostProcessorHelper.java       |   5 +-
 .../camel/processor/DeferServiceFactory.java       |   2 +
 .../camel/processor/EventNotifierProducer.java     | 109 +++++++++++++++++++++
 .../apache/camel/processor/UnitOfWorkProducer.java |  10 +-
 .../CamelProduceInterfaceEventNotifierTest.java    |  93 ++++++++++++++++++
 6 files changed, 216 insertions(+), 6 deletions(-)

diff --git a/core/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java b/core/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java
index 9cf040c..26253b2 100644
--- a/core/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java
+++ b/core/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java
@@ -21,6 +21,8 @@ import java.lang.reflect.Proxy;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Producer;
 import org.apache.camel.processor.DeferServiceFactory;
+import org.apache.camel.processor.EventNotifierProducer;
+import org.apache.camel.processor.UnitOfWorkProducer;
 
 /**
  * A helper class for creating proxies which delegate to Camel
@@ -53,7 +55,6 @@ public final class ProxyHelper {
      */
     public static <T> T createProxy(Endpoint endpoint, boolean binding, ClassLoader cl, Class<T>[] interfaceClasses, MethodInfoCache methodCache) throws Exception {
         Producer producer = DeferServiceFactory.createProducer(endpoint);
-        endpoint.getCamelContext().deferStartService(producer, true);
         return createProxyObject(endpoint, binding, producer, cl, interfaceClasses, methodCache);
     }
 
diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java b/core/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
index 0375c7e..2845ac8 100644
--- a/core/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
+++ b/core/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
@@ -40,6 +40,8 @@ import org.apache.camel.Service;
 import org.apache.camel.builder.DefaultFluentProducerTemplate;
 import org.apache.camel.component.bean.ProxyHelper;
 import org.apache.camel.processor.DeferServiceFactory;
+import org.apache.camel.processor.EventNotifierProducer;
+import org.apache.camel.processor.SendProcessor;
 import org.apache.camel.processor.UnitOfWorkProducer;
 import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.IntrospectionSupport;
@@ -384,8 +386,7 @@ public class CamelPostProcessorHelper implements CamelContextAware {
      */
     protected Producer createInjectionProducer(Endpoint endpoint, Object bean, String beanName) {
         try {
-            Producer producer = DeferServiceFactory.createProducer(endpoint);
-            return new UnitOfWorkProducer(producer);
+            return DeferServiceFactory.createProducer(endpoint);
         } catch (Exception e) {
             throw RuntimeCamelException.wrapRuntimeCamelException(e);
         }
diff --git a/core/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java b/core/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java
index 26602ca..ae69481 100644
--- a/core/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java
+++ b/core/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java
@@ -43,6 +43,8 @@ public final class DeferServiceFactory {
      */
     public static Producer createProducer(Endpoint endpoint) throws Exception {
         Producer producer = new DeferProducer(endpoint);
+        producer = new UnitOfWorkProducer(producer);
+        producer = new EventNotifierProducer(producer);
         endpoint.getCamelContext().deferStartService(producer, true);
         return producer;
     }
diff --git a/core/camel-core/src/main/java/org/apache/camel/processor/EventNotifierProducer.java b/core/camel-core/src/main/java/org/apache/camel/processor/EventNotifierProducer.java
new file mode 100644
index 0000000..a646349
--- /dev/null
+++ b/core/camel-core/src/main/java/org/apache/camel/processor/EventNotifierProducer.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProducer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.support.AsyncProcessorConverterHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.EventHelper;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.StopWatch;
+
+/**
+ * Ensures a {@link Producer} do send {@link org.apache.camel.spi.EventNotifier} notifications when
+ * sending.
+ */
+public final class EventNotifierProducer extends DefaultAsyncProducer {
+
+    private final AsyncProducer producer;
+
+    /**
+     * The producer which should be executed and emit {@link org.apache.camel.spi.EventNotifier} notifications.
+     *
+     * @param producer the producer
+     */
+    public EventNotifierProducer(Producer producer) {
+        super(producer.getEndpoint());
+        this.producer = AsyncProcessorConverterHelper.convert(producer);
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (!isStarted()) {
+            exchange.setException(new IllegalStateException("Producer has not been started: " + this));
+            callback.done(true);
+            return true;
+        }
+
+        final boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, getEndpoint());
+        // record timing for sending the exchange using the producer
+        StopWatch watch;
+        if (sending) {
+            watch = new StopWatch();
+        } else {
+            watch = null;
+        }
+
+        try {
+            log.debug(">>>> {} {}", getEndpoint(), exchange);
+            return producer.process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    try {
+                        // emit event that the exchange was sent to the endpoint
+                        if (watch != null) {
+                            long timeTaken = watch.taken();
+                            EventHelper.notifyExchangeSent(exchange.getContext(), exchange, getEndpoint(), timeTaken);
+                        }
+                    } finally {
+                        callback.done(doneSync);
+                    }
+                }
+            });
+        } catch (Throwable throwable) {
+            exchange.setException(throwable);
+            callback.done(true);
+        }
+
+        return true;
+    }
+
+    @Override
+    public Endpoint getEndpoint() {
+        return producer.getEndpoint();
+    }
+
+    @Override
+    public boolean isSingleton() {
+        return producer.isSingleton();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ServiceHelper.startService(producer);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(producer);
+    }
+}
diff --git a/core/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java b/core/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
index 965a25d..b45c296 100644
--- a/core/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
+++ b/core/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
@@ -16,16 +16,18 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Producer;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.service.ServiceHelper;
 
 /**
  * Ensures a {@link Producer} is executed within an {@link org.apache.camel.spi.UnitOfWork}.
  */
-public final class UnitOfWorkProducer implements Producer {
+public final class UnitOfWorkProducer extends DefaultAsyncProducer {
 
     private final Producer producer;
     private final AsyncProcessor processor;
@@ -36,6 +38,7 @@ public final class UnitOfWorkProducer implements Producer {
      * @param producer the producer
      */
     public UnitOfWorkProducer(Producer producer) {
+        super(producer.getEndpoint());
         this.producer = producer;
         // wrap in unit of work
         CamelInternalProcessor internal = new CamelInternalProcessor(producer);
@@ -47,8 +50,9 @@ public final class UnitOfWorkProducer implements Producer {
         return producer.getEndpoint();
     }
 
-    public void process(final Exchange exchange) throws Exception {
-        processor.process(exchange);
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        return processor.process(exchange, callback);
     }
 
     public void start() throws Exception {
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/CamelProduceInterfaceEventNotifierTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/CamelProduceInterfaceEventNotifierTest.java
new file mode 100644
index 0000000..24e5c82
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/CamelProduceInterfaceEventNotifierTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Produce;
+import org.apache.camel.spi.CamelEvent;
+import org.apache.camel.support.EventNotifierSupport;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CamelProduceInterfaceEventNotifierTest extends ContextTestSupport {
+
+    private static List<CamelEvent> events = new ArrayList<>();
+
+    private DefaultCamelBeanPostProcessor postProcessor;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        DefaultCamelContext context = new DefaultCamelContext(createRegistry());
+        context.getManagementStrategy().addEventNotifier(new EventNotifierSupport() {
+            public void notify(CamelEvent event) throws Exception {
+                if (event instanceof CamelEvent.ExchangeSendingEvent || event instanceof CamelEvent.ExchangeSentEvent) {
+                    events.add(event);
+                }
+            }
+        });
+        return context;
+    }
+
+    @Test
+    public void testPostProcessor() throws Exception {
+        events.clear();
+
+        int before = events.size();
+        assertEquals(0, before);
+
+        MySender sender = new MySender();
+
+        postProcessor.postProcessBeforeInitialization(sender, "foo");
+        postProcessor.postProcessAfterInitialization(sender, "foo");
+
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        sender.hello.sayHello("Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        int after = events.size();
+        // should be 2 events
+        assertEquals(2, after);
+        assertTrue(events.get(0) instanceof CamelEvent.ExchangeSendingEvent);
+        assertTrue(events.get(1) instanceof CamelEvent.ExchangeSentEvent);
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        postProcessor = new DefaultCamelBeanPostProcessor(context);
+    }
+
+    interface FooService {
+
+        void sayHello(String hello);
+    }
+
+    class MySender {
+
+        @Produce(uri = "mock:result")
+        FooService hello;
+
+    }
+
+}