You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/11/01 14:17:02 UTC

[camel] branch master updated: CAMEL-11975: camel-connector - Allow to set before/after consumer/producer processors per endpoint

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new ceb8669  CAMEL-11975: camel-connector - Allow to set before/after consumer/producer processors per endpoint
ceb8669 is described below

commit ceb8669704f87561c53a4324893b5fb31302e704
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Nov 1 15:16:49 2017 +0100

    CAMEL-11975: camel-connector - Allow to set before/after consumer/producer processors per endpoint
---
 .../connector/DefaultConnectorComponent.java       |  7 ++-
 .../connector/DefaultConnectorEndpoint.java        | 68 ++++++++++++++++++++--
 .../src/main/java/org/foo/FooBarWineRoute.java     |  2 +
 .../main/java/org/foo/connector/FooComponent.java  |  3 +
 4 files changed, 75 insertions(+), 5 deletions(-)

diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java
index 3d56ce3..f46b708 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java
@@ -133,7 +133,7 @@ public abstract class DefaultConnectorComponent extends DefaultComponent impleme
             log.info("Connector resolved: {} -> {}", sanitizeUri(uri), sanitizeUri(delegateUri));
         }
 
-        Endpoint answer;
+        DefaultConnectorEndpoint answer;
         // are we scheduler based?
         if ("timer".equals(model.getScheduler())) {
             SchedulerTimerConnectorEndpoint endpoint = new SchedulerTimerConnectorEndpoint(uri, this, delegate, model.getInputDataType(), model.getOutputDataType());
@@ -143,6 +143,11 @@ public abstract class DefaultConnectorComponent extends DefaultComponent impleme
             answer = new DefaultConnectorEndpoint(uri, this, delegate, model.getInputDataType(), model.getOutputDataType());
         }
 
+        answer.setBeforeProducer(getBeforeProducer());
+        answer.setAfterProducer(getAfterProducer());
+        answer.setBeforeConsumer(getBeforeConsumer());
+        answer.setAfterConsumer(getAfterConsumer());
+
         // clean-up parameters so that validation won't fail later on
         // in DefaultConnectorComponent.validateParameters()
         parameters.clear();
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
index 9e68f0f..6952915 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
@@ -33,6 +33,10 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat
     private final Endpoint endpoint;
     private final DataType inputDataType;
     private final DataType outputDataType;
+    private Processor beforeProducer;
+    private Processor afterProducer;
+    private Processor beforeConsumer;
+    private Processor afterConsumer;
 
     public DefaultConnectorEndpoint(String endpointUri, ConnectorComponent component, Endpoint endpoint,
                                     DataType inputDataType, DataType outputDataType) {
@@ -46,8 +50,8 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat
     public Producer createProducer() throws Exception {
         final Producer producer = endpoint.createProducer();
 
-        final Processor beforeProducer = getComponent().getBeforeProducer();
-        final Processor afterProducer = getComponent().getAfterProducer();
+        final Processor beforeProducer = getBeforeProducer();
+        final Processor afterProducer = getAfterProducer();
 
         // use a pipeline to process before, producer, after in that order
         // create producer with the pipeline
@@ -58,8 +62,8 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat
 
     @Override
     public Consumer createConsumer(final Processor processor) throws Exception {
-        final Processor beforeConsumer = getComponent().getBeforeConsumer();
-        final Processor afterConsumer = getComponent().getAfterConsumer();
+        final Processor beforeConsumer = getBeforeConsumer();
+        final Processor afterConsumer = getAfterConsumer();
 
         // use a pipeline to process before, processor, after in that order
         // create consumer with the pipeline
@@ -100,6 +104,62 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat
         return outputDataType;
     }
 
+    /**
+     * Gets the processor used to perform custom processing before the producer is sending the message.
+     */
+    public Processor getBeforeProducer() {
+        return beforeProducer;
+    }
+
+    /**
+     * To perform custom processing before the producer is sending the message.
+     */
+    public void setBeforeProducer(Processor beforeProducer) {
+        this.beforeProducer = beforeProducer;
+    }
+
+    /**
+     * Gets the processor used to perform custom processing after the producer has sent the message and received any reply (if InOut).
+     */
+    public Processor getAfterProducer() {
+        return afterProducer;
+    }
+
+    /**
+     * To perform custom processing after the producer has sent the message and received any reply (if InOut).
+     */
+    public void setAfterProducer(Processor afterProducer) {
+        this.afterProducer = afterProducer;
+    }
+
+    /**
+     * Gets the processor used to perform custom processing when the consumer has just received a new incoming message.
+     */
+    public Processor getBeforeConsumer() {
+        return beforeConsumer;
+    }
+
+    /**
+     * To perform custom processing when the consumer has just received a new incoming message.
+     */
+    public void setBeforeConsumer(Processor beforeConsumer) {
+        this.beforeConsumer = beforeConsumer;
+    }
+
+    /**
+     * Gets the processor used to perform custom processing when the consumer is about to send back a reply message to the caller (if InOut).
+     */
+    public Processor getAfterConsumer() {
+        return afterConsumer;
+    }
+
+    /**
+     * To perform custom processing when the consumer is about to send back a reply message to the caller (if InOut).
+     */
+    public void setAfterConsumer(Processor afterConsumer) {
+        this.afterConsumer = afterConsumer;
+    }
+
     @Override
     protected void doStart() throws Exception {
         super.doStart();
diff --git a/connectors/examples/foo-bar-wine-example/src/main/java/org/foo/FooBarWineRoute.java b/connectors/examples/foo-bar-wine-example/src/main/java/org/foo/FooBarWineRoute.java
index 62cd18a..b066f6d 100644
--- a/connectors/examples/foo-bar-wine-example/src/main/java/org/foo/FooBarWineRoute.java
+++ b/connectors/examples/foo-bar-wine-example/src/main/java/org/foo/FooBarWineRoute.java
@@ -26,10 +26,12 @@ public class FooBarWineRoute extends RouteBuilder {
     @Override
     public void configure() throws Exception {
         from("foo:ThirstyBear?period=2000")
+            .log("Who is this: ${header.whoami}")
             .to("wine:Wine?amount=2")
             .log("ThirstyBear ordered ${body}");
 
         from("foo:Moes?period=5000")
+            .log("Who is this: ${header.whoami}")
             .to("bar:Beer?amount=5&celebrity=true")
             .log("Moes ordered ${body}");
     }
diff --git a/connectors/examples/foo-connector/src/main/java/org/foo/connector/FooComponent.java b/connectors/examples/foo-connector/src/main/java/org/foo/connector/FooComponent.java
index 20318f5..04861f2 100644
--- a/connectors/examples/foo-connector/src/main/java/org/foo/connector/FooComponent.java
+++ b/connectors/examples/foo-connector/src/main/java/org/foo/connector/FooComponent.java
@@ -22,6 +22,9 @@ public class FooComponent extends DefaultConnectorComponent {
 
     public FooComponent() {
         super("foo", "org.foo.connector.FooComponent");
+
+        // show how you can add a fixed header
+        setBeforeConsumer(e -> e.getIn().setHeader("whoami", "I am foo"));
     }
 
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].