You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/08/04 09:01:36 UTC

[camel] branch master updated: CAMEL-13783: Fix the camel-archetype-component to inherit from DefaultConsumer instead of ScheduledPollConsumer (#3059)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 82581fb  CAMEL-13783: Fix the camel-archetype-component to inherit from DefaultConsumer instead of ScheduledPollConsumer (#3059)
82581fb is described below

commit 82581fb4f08f03527d82b792132c5b261e9dcca5
Author: Omar Al-Safi <om...@gmail.com>
AuthorDate: Sun Aug 4 11:01:21 2019 +0200

    CAMEL-13783: Fix the camel-archetype-component to inherit from DefaultConsumer instead of ScheduledPollConsumer (#3059)
    
    * Change to DefaultConsumer in camel-archetype-component
    
    * Change to DefaultConsumer import package
    
    Signed-off-by: Omar Al-Safi <om...@gmail.com>
    
    * Add event bus helper to broadcast the events
    
    Signed-off-by: Omar Al-Safi <om...@gmail.com>
---
 .../java/EventBusHelper.java}                      | 55 +++++++++++++---------
 .../src/main/java/__name__Consumer.java            | 49 +++++++++++++++----
 .../src/main/java/__name__Endpoint.java            |  6 +++
 .../src/test/java/__name__ComponentTest.java       | 27 +++++++++--
 4 files changed, 101 insertions(+), 36 deletions(-)

diff --git a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/test/java/__name__ComponentTest.java b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/EventBusHelper.java
similarity index 52%
copy from archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/test/java/__name__ComponentTest.java
copy to archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/EventBusHelper.java
index 6f723bd..68ea9cd 100644
--- a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/test/java/__name__ComponentTest.java
+++ b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/EventBusHelper.java
@@ -16,29 +16,38 @@
 ## ------------------------------------------------------------------------
 package ${package};
 
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Test;
-
-public class ${name}ComponentTest extends CamelTestSupport {
-
-    @Test
-    public void test${name}() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMinimumMessageCount(1);       
-        
-        assertMockEndpointsSatisfied();
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+
+public class EventBusHelper {
+
+    private static EventBusHelper INSTANCE;
+
+    final private Set<Consumer> subscribers = ConcurrentHashMap.newKeySet();
+
+    private EventBusHelper(){ }
+
+    public static EventBusHelper getInstance(){
+        if (INSTANCE == null){
+            INSTANCE = new EventBusHelper();
+        }
+
+        return INSTANCE;
     }
 
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            public void configure() {
-                from("${scheme}://foo")
-                  .to("${scheme}://bar")
-                  .to("mock:result");
-            }
-        };
+    public <T> void subscribe(final Consumer<T> subscriber) {
+        subscribers.add(subscriber);
     }
-}
+
+    @SuppressWarnings("unchecked")
+    public <T> void publish(final T event){
+        // Notify all subscribers
+        subscribers.forEach(consumer -> publishSingleEvent(event, consumer));
+    }
+
+    private <T> void publishSingleEvent(final T event, final Consumer<T> subscriber){
+        subscriber.accept(event);
+    }
+
+}
\ No newline at end of file
diff --git a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Consumer.java b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Consumer.java
index 42cf771..9611c6a 100644
--- a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Consumer.java
+++ b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Consumer.java
@@ -16,35 +16,64 @@
 ## ------------------------------------------------------------------------
 package ${package};
 
-import java.util.Date;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.support.ScheduledPollConsumer;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.support.DefaultConsumer;
+
+import java.util.concurrent.ExecutorService;
 
 /**
  * The ${name} consumer.
  */
-public class ${name}Consumer extends ScheduledPollConsumer {
+public class ${name}Consumer extends DefaultConsumer {
     private final ${name}Endpoint endpoint;
+    private final EventBusHelper eventBusHelper;
+
+    private ExecutorService executorService;
 
     public ${name}Consumer(${name}Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
+        eventBusHelper = EventBusHelper.getInstance();
     }
 
     @Override
-    protected int poll() throws Exception {
-        Exchange exchange = endpoint.createExchange();
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        // start a single threaded pool to monitor events
+        executorService = endpoint.createExecutor();
+
+        // submit task to the thread pool
+        executorService.submit(() -> {
+            // subscribe to an event
+            eventBusHelper.subscribe(this::onEventListener);
+        });
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+
+        if(log.isTraceEnabled()){
+            log.trace("Shutting down consumer gracefully");
+        }
+
+        // shutdown the thread pool gracefully
+        getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
+    }
+
+    private void onEventListener(final Object event) {
+        final Exchange exchange = endpoint.createExchange();
 
-        // create a message body
-        Date now = new Date();
-        exchange.getIn().setBody("Hello World! The time is " + now);
+        exchange.getIn().setBody("Hello World! The time is " + event);
 
         try {
             // send message to next processor in the route
             getProcessor().process(exchange);
-            return 1; // number of messages polled
+        } catch (Exception ex){
+            exchange.setException(new RuntimeCamelException("Message forwarding failed", ex));
         } finally {
             // log exception if an exception occurred and was not handled
             if (exchange.getException() != null) {
diff --git a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Endpoint.java b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Endpoint.java
index c06d75e..6aeceb4 100644
--- a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Endpoint.java
+++ b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/main/java/__name__Endpoint.java
@@ -25,6 +25,8 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 
+import java.util.concurrent.ExecutorService;
+
 /**
  * Represents a ${name} endpoint.
  */
@@ -76,4 +78,8 @@ public class ${name}Endpoint extends DefaultEndpoint {
     public int getOption() {
         return option;
     }
+
+    public ExecutorService createExecutor(){
+        return getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this, "${name}Consumer");
+    }
 }
diff --git a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/test/java/__name__ComponentTest.java b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/test/java/__name__ComponentTest.java
index 6f723bd..35e6667 100644
--- a/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/test/java/__name__ComponentTest.java
+++ b/archetypes/camel-archetype-component/src/main/resources/archetype-resources/src/test/java/__name__ComponentTest.java
@@ -21,14 +21,23 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+
 public class ${name}ComponentTest extends CamelTestSupport {
 
+    final private EventBusHelper eventBusHelper = EventBusHelper.getInstance();
+
     @Test
     public void test${name}() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMinimumMessageCount(1);       
-        
-        assertMockEndpointsSatisfied();
+        mock.expectedMinimumMessageCount(20);
+
+        // Trigger events to subscribers
+        simulateEventTrigger();
+
+        mock.await();
     }
 
     @Override
@@ -41,4 +50,16 @@ public class ${name}ComponentTest extends CamelTestSupport {
             }
         };
     }
+
+    private void simulateEventTrigger() {
+        final TimerTask task = new TimerTask() {
+            @Override
+            public void run() {
+                final Date now = new Date();
+                eventBusHelper.publish(now);
+            }
+        };
+
+        new Timer().scheduleAtFixedRate(task, 1L, 1000L);
+    }
 }