You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/02/14 11:36:34 UTC
[camel] branch master updated: CAMEL-13199: Using @Produce as proxy
for sending to endpoint does not emit sending/sent events. Also use async
processing in unit of work producer.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new c07baef CAMEL-13199: Using @Produce as proxy for sending to endpoint does not emit sending/sent events. Also use async processing in unit of work producer.
c07baef is described below
commit c07baeff3795d7ff4c0426b97e5dba5f9da15934
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Feb 14 11:37:20 2019 +0100
CAMEL-13199: Using @Produce as proxy for sending to endpoint does not emit sending/sent events. Also use async processing in unit of work producer.
---
.../apache/camel/component/bean/ProxyHelper.java | 3 +-
.../camel/impl/CamelPostProcessorHelper.java | 5 +-
.../camel/processor/DeferServiceFactory.java | 2 +
.../camel/processor/EventNotifierProducer.java | 109 +++++++++++++++++++++
.../apache/camel/processor/UnitOfWorkProducer.java | 10 +-
.../CamelProduceInterfaceEventNotifierTest.java | 93 ++++++++++++++++++
6 files changed, 216 insertions(+), 6 deletions(-)
diff --git a/core/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java b/core/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java
index 9cf040c..26253b2 100644
--- a/core/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java
+++ b/core/camel-core/src/main/java/org/apache/camel/component/bean/ProxyHelper.java
@@ -21,6 +21,8 @@ import java.lang.reflect.Proxy;
import org.apache.camel.Endpoint;
import org.apache.camel.Producer;
import org.apache.camel.processor.DeferServiceFactory;
+import org.apache.camel.processor.EventNotifierProducer;
+import org.apache.camel.processor.UnitOfWorkProducer;
/**
* A helper class for creating proxies which delegate to Camel
@@ -53,7 +55,6 @@ public final class ProxyHelper {
*/
public static <T> T createProxy(Endpoint endpoint, boolean binding, ClassLoader cl, Class<T>[] interfaceClasses, MethodInfoCache methodCache) throws Exception {
Producer producer = DeferServiceFactory.createProducer(endpoint);
- endpoint.getCamelContext().deferStartService(producer, true);
return createProxyObject(endpoint, binding, producer, cl, interfaceClasses, methodCache);
}
diff --git a/core/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java b/core/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
index 0375c7e..2845ac8 100644
--- a/core/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
+++ b/core/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
@@ -40,6 +40,8 @@ import org.apache.camel.Service;
import org.apache.camel.builder.DefaultFluentProducerTemplate;
import org.apache.camel.component.bean.ProxyHelper;
import org.apache.camel.processor.DeferServiceFactory;
+import org.apache.camel.processor.EventNotifierProducer;
+import org.apache.camel.processor.SendProcessor;
import org.apache.camel.processor.UnitOfWorkProducer;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.IntrospectionSupport;
@@ -384,8 +386,7 @@ public class CamelPostProcessorHelper implements CamelContextAware {
*/
protected Producer createInjectionProducer(Endpoint endpoint, Object bean, String beanName) {
try {
- Producer producer = DeferServiceFactory.createProducer(endpoint);
- return new UnitOfWorkProducer(producer);
+ return DeferServiceFactory.createProducer(endpoint);
} catch (Exception e) {
throw RuntimeCamelException.wrapRuntimeCamelException(e);
}
diff --git a/core/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java b/core/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java
index 26602ca..ae69481 100644
--- a/core/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java
+++ b/core/camel-core/src/main/java/org/apache/camel/processor/DeferServiceFactory.java
@@ -43,6 +43,8 @@ public final class DeferServiceFactory {
*/
public static Producer createProducer(Endpoint endpoint) throws Exception {
Producer producer = new DeferProducer(endpoint);
+ producer = new UnitOfWorkProducer(producer);
+ producer = new EventNotifierProducer(producer);
endpoint.getCamelContext().deferStartService(producer, true);
return producer;
}
diff --git a/core/camel-core/src/main/java/org/apache/camel/processor/EventNotifierProducer.java b/core/camel-core/src/main/java/org/apache/camel/processor/EventNotifierProducer.java
new file mode 100644
index 0000000..a646349
--- /dev/null
+++ b/core/camel-core/src/main/java/org/apache/camel/processor/EventNotifierProducer.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProducer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.support.AsyncProcessorConverterHelper;
+import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.DefaultAsyncProducer;
+import org.apache.camel.support.EventHelper;
+import org.apache.camel.support.service.ServiceHelper;
+import org.apache.camel.util.StopWatch;
+
+/**
+ * Ensures a {@link Producer} do send {@link org.apache.camel.spi.EventNotifier} notifications when
+ * sending.
+ */
+public final class EventNotifierProducer extends DefaultAsyncProducer {
+
+ private final AsyncProducer producer;
+
+ /**
+ * The producer which should be executed and emit {@link org.apache.camel.spi.EventNotifier} notifications.
+ *
+ * @param producer the producer
+ */
+ public EventNotifierProducer(Producer producer) {
+ super(producer.getEndpoint());
+ this.producer = AsyncProcessorConverterHelper.convert(producer);
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ if (!isStarted()) {
+ exchange.setException(new IllegalStateException("Producer has not been started: " + this));
+ callback.done(true);
+ return true;
+ }
+
+ final boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, getEndpoint());
+ // record timing for sending the exchange using the producer
+ StopWatch watch;
+ if (sending) {
+ watch = new StopWatch();
+ } else {
+ watch = null;
+ }
+
+ try {
+ log.debug(">>>> {} {}", getEndpoint(), exchange);
+ return producer.process(exchange, new AsyncCallback() {
+ @Override
+ public void done(boolean doneSync) {
+ try {
+ // emit event that the exchange was sent to the endpoint
+ if (watch != null) {
+ long timeTaken = watch.taken();
+ EventHelper.notifyExchangeSent(exchange.getContext(), exchange, getEndpoint(), timeTaken);
+ }
+ } finally {
+ callback.done(doneSync);
+ }
+ }
+ });
+ } catch (Throwable throwable) {
+ exchange.setException(throwable);
+ callback.done(true);
+ }
+
+ return true;
+ }
+
+ @Override
+ public Endpoint getEndpoint() {
+ return producer.getEndpoint();
+ }
+
+ @Override
+ public boolean isSingleton() {
+ return producer.isSingleton();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ServiceHelper.startService(producer);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopService(producer);
+ }
+}
diff --git a/core/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java b/core/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
index 965a25d..b45c296 100644
--- a/core/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
+++ b/core/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java
@@ -16,16 +16,18 @@
*/
package org.apache.camel.processor;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
+import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.service.ServiceHelper;
/**
* Ensures a {@link Producer} is executed within an {@link org.apache.camel.spi.UnitOfWork}.
*/
-public final class UnitOfWorkProducer implements Producer {
+public final class UnitOfWorkProducer extends DefaultAsyncProducer {
private final Producer producer;
private final AsyncProcessor processor;
@@ -36,6 +38,7 @@ public final class UnitOfWorkProducer implements Producer {
* @param producer the producer
*/
public UnitOfWorkProducer(Producer producer) {
+ super(producer.getEndpoint());
this.producer = producer;
// wrap in unit of work
CamelInternalProcessor internal = new CamelInternalProcessor(producer);
@@ -47,8 +50,9 @@ public final class UnitOfWorkProducer implements Producer {
return producer.getEndpoint();
}
- public void process(final Exchange exchange) throws Exception {
- processor.process(exchange);
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ return processor.process(exchange, callback);
}
public void start() throws Exception {
diff --git a/core/camel-core/src/test/java/org/apache/camel/impl/CamelProduceInterfaceEventNotifierTest.java b/core/camel-core/src/test/java/org/apache/camel/impl/CamelProduceInterfaceEventNotifierTest.java
new file mode 100644
index 0000000..24e5c82
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/impl/CamelProduceInterfaceEventNotifierTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Produce;
+import org.apache.camel.spi.CamelEvent;
+import org.apache.camel.support.EventNotifierSupport;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CamelProduceInterfaceEventNotifierTest extends ContextTestSupport {
+
+ private static List<CamelEvent> events = new ArrayList<>();
+
+ private DefaultCamelBeanPostProcessor postProcessor;
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ DefaultCamelContext context = new DefaultCamelContext(createRegistry());
+ context.getManagementStrategy().addEventNotifier(new EventNotifierSupport() {
+ public void notify(CamelEvent event) throws Exception {
+ if (event instanceof CamelEvent.ExchangeSendingEvent || event instanceof CamelEvent.ExchangeSentEvent) {
+ events.add(event);
+ }
+ }
+ });
+ return context;
+ }
+
+ @Test
+ public void testPostProcessor() throws Exception {
+ events.clear();
+
+ int before = events.size();
+ assertEquals(0, before);
+
+ MySender sender = new MySender();
+
+ postProcessor.postProcessBeforeInitialization(sender, "foo");
+ postProcessor.postProcessAfterInitialization(sender, "foo");
+
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ sender.hello.sayHello("Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ int after = events.size();
+ // should be 2 events
+ assertEquals(2, after);
+ assertTrue(events.get(0) instanceof CamelEvent.ExchangeSendingEvent);
+ assertTrue(events.get(1) instanceof CamelEvent.ExchangeSentEvent);
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ postProcessor = new DefaultCamelBeanPostProcessor(context);
+ }
+
+ interface FooService {
+
+ void sayHello(String hello);
+ }
+
+ class MySender {
+
+ @Produce(uri = "mock:result")
+ FooService hello;
+
+ }
+
+}