You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/04 09:01:36 UTC
[camel] branch master updated: CAMEL-13783: Fix the
camel-archetype-component to inherit from DefaultConsumer instead of
ScheduledPollConsumer (#3059)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 82581fb CAMEL-13783: Fix the camel-archetype-component to inherit from DefaultConsumer instead of ScheduledPollConsumer (#3059)
82581fb is described below
commit 82581fb4f08f03527d82b792132c5b261e9dcca5
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Sun Aug 4 11:01:21 2019 +0200
CAMEL-13783: Fix the camel-archetype-component to inherit from DefaultConsumer instead of ScheduledPollConsumer (#3059)
* Change to DefaultConsumer in camel-archetype-component
* Change to DefaultConsumer import package
Signed-off-by: Omar Al-Safi <om...@gmail.com>
* Add event bus helper to broadcast the events
Signed-off-by: Omar Al-Safi <om...@gmail.com>
---
.../java/EventBusHelper.java} | 55 +++++++++++++---------
.../src/main/java/__name__Consumer.java | 49 +++++++++++++++----
.../src/main/java/__name__Endpoint.java | 6 +++
.../src/test/java/__name__ComponentTest.java | 27 +++++++++--
4 files changed, 101 insertions(+), 36 deletions(-)
diff --git a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/test/java/__name__ComponentTest.java b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/EventBusHelper.java
similarity index 52%
copy from archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/test/java/__name__ComponentTest.java
copy to archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/EventBusHelper.java
index 6f723bd..68ea9cd 100644
--- a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/test/java/__name__ComponentTest.java
+++ b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/EventBusHelper.java
@@ -16,29 +16,38 @@
## ------------------------------------------------------------------------
package ${package};
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Test;
-
-public class ${name}ComponentTest extends CamelTestSupport {
-
- @Test
- public void test${name}() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMinimumMessageCount(1);
-
- assertMockEndpointsSatisfied();
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+public class EventBusHelper {
+
+ private static EventBusHelper INSTANCE;
+
+ final private Set<Consumer> subscribers = ConcurrentHashMap.newKeySet();
+
+ private EventBusHelper(){ }
+
+ public static EventBusHelper getInstance(){
+ if (INSTANCE == null){
+ INSTANCE = new EventBusHelper();
+ }
+
+ return INSTANCE;
}
- @Override
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- public void configure() {
- from("${scheme}://foo")
- .to("${scheme}://bar")
- .to("mock:result");
- }
- };
+ public <T> void subscribe(final Consumer<T> subscriber) {
+ subscribers.add(subscriber);
}
-}
+
+ @SuppressWarnings("unchecked")
+ public <T> void publish(final T event){
+ // Notify all subscribers
+ subscribers.forEach(consumer -> publishSingleEvent(event, consumer));
+ }
+
+ private <T> void publishSingleEvent(final T event, final Consumer<T> subscriber){
+ subscriber.accept(event);
+ }
+
+}
\ No newline at end of file
diff --git a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Consumer.java b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Consumer.java
index 42cf771..9611c6a 100644
--- a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Consumer.java
+++ b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Consumer.java
@@ -16,35 +16,64 @@
## ------------------------------------------------------------------------
package ${package};
-import java.util.Date;
-
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.support.ScheduledPollConsumer;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.support.DefaultConsumer;
+
+import java.util.concurrent.ExecutorService;
/**
* The ${name} consumer.
*/
-public class ${name}Consumer extends ScheduledPollConsumer {
+public class ${name}Consumer extends DefaultConsumer {
private final ${name}Endpoint endpoint;
+ private final EventBusHelper eventBusHelper;
+
+ private ExecutorService executorService;
public ${name}Consumer(${name}Endpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
+ eventBusHelper = EventBusHelper.getInstance();
}
@Override
- protected int poll() throws Exception {
- Exchange exchange = endpoint.createExchange();
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ // start a single threaded pool to monitor events
+ executorService = endpoint.createExecutor();
+
+ // submit task to the thread pool
+ executorService.submit(() -> {
+ // subscribe to an event
+ eventBusHelper.subscribe(this::onEventListener);
+ });
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+
+ if(log.isTraceEnabled()){
+ log.trace("Shutting down consumer gracefully");
+ }
+
+ // shutdown the thread pool gracefully
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
+ }
+
+ private void onEventListener(final Object event) {
+ final Exchange exchange = endpoint.createExchange();
- // create a message body
- Date now = new Date();
- exchange.getIn().setBody("Hello World! The time is " + now);
+ exchange.getIn().setBody("Hello World! The time is " + event);
try {
// send message to next processor in the route
getProcessor().process(exchange);
- return 1; // number of messages polled
+ } catch (Exception ex){
+ exchange.setException(new RuntimeCamelException("Message forwarding failed", ex));
} finally {
// log exception if an exception occurred and was not handled
if (exchange.getException() != null) {
diff --git a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Endpoint.java b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Endpoint.java
index c06d75e..6aeceb4 100644
--- a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Endpoint.java
+++ b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Endpoint.java
@@ -25,6 +25,8 @@ import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
+import java.util.concurrent.ExecutorService;
+
/**
* Represents a ${name} endpoint.
*/
@@ -76,4 +78,8 @@ public class ${name}Endpoint extends DefaultEndpoint {
public int getOption() {
return option;
}
+
+ public ExecutorService createExecutor(){
+ return getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "${name}Consumer");
+ }
}
diff --git a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/test/java/__name__ComponentTest.java b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/test/java/__name__ComponentTest.java
index 6f723bd..35e6667 100644
--- a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/test/java/__name__ComponentTest.java
+++ b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/test/java/__name__ComponentTest.java
@@ -21,14 +21,23 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+
public class ${name}ComponentTest extends CamelTestSupport {
+ final private EventBusHelper eventBusHelper = EventBusHelper.getInstance();
+
@Test
public void test${name}() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMinimumMessageCount(1);
-
- assertMockEndpointsSatisfied();
+ mock.expectedMinimumMessageCount(20);
+
+ // Trigger events to subscribers
+ simulateEventTrigger();
+
+ mock.await();
}
@Override
@@ -41,4 +50,16 @@ public class ${name}ComponentTest extends CamelTestSupport {
}
};
}
+
+ private void simulateEventTrigger() {
+ final TimerTask task = new TimerTask() {
+ @Override
+ public void run() {
+ final Date now = new Date();
+ eventBusHelper.publish(now);
+ }
+ };
+
+ new Timer().scheduleAtFixedRate(task, 1L, 1000L);
+ }
}