You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/11/01 14:17:02 UTC
[camel] branch master updated: CAMEL-11975: camel-connector - Allow
to set before/after consumer/producer processors per endpoint
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new ceb8669 CAMEL-11975: camel-connector - Allow to set before/after consumer/producer processors per endpoint
ceb8669 is described below
commit ceb8669704f87561c53a4324893b5fb31302e704
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Wed Nov 1 15:16:49 2017 +0100
CAMEL-11975: camel-connector - Allow to set before/after consumer/producer processors per endpoint
---
.../connector/DefaultConnectorComponent.java | 7 ++-
.../connector/DefaultConnectorEndpoint.java | 68 ++++++++++++++++++++--
.../src/main/java/org/foo/FooBarWineRoute.java | 2 +
.../main/java/org/foo/connector/FooComponent.java | 3 +
4 files changed, 75 insertions(+), 5 deletions(-)
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java
index 3d56ce3..f46b708 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java
@@ -133,7 +133,7 @@ public abstract class DefaultConnectorComponent extends DefaultComponent impleme
log.info("Connector resolved: {} -> {}", sanitizeUri(uri), sanitizeUri(delegateUri));
}
- Endpoint answer;
+ DefaultConnectorEndpoint answer;
// are we scheduler based?
if ("timer".equals(model.getScheduler())) {
SchedulerTimerConnectorEndpoint endpoint = new SchedulerTimerConnectorEndpoint(uri, this, delegate, model.getInputDataType(), model.getOutputDataType());
@@ -143,6 +143,11 @@ public abstract class DefaultConnectorComponent extends DefaultComponent impleme
answer = new DefaultConnectorEndpoint(uri, this, delegate, model.getInputDataType(), model.getOutputDataType());
}
+ answer.setBeforeProducer(getBeforeProducer());
+ answer.setAfterProducer(getAfterProducer());
+ answer.setBeforeConsumer(getBeforeConsumer());
+ answer.setAfterConsumer(getAfterConsumer());
+
// clean-up parameters so that validation won't fail later on
// in DefaultConnectorComponent.validateParameters()
parameters.clear();
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
index 9e68f0f..6952915 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
@@ -33,6 +33,10 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat
private final Endpoint endpoint;
private final DataType inputDataType;
private final DataType outputDataType;
+ private Processor beforeProducer;
+ private Processor afterProducer;
+ private Processor beforeConsumer;
+ private Processor afterConsumer;
public DefaultConnectorEndpoint(String endpointUri, ConnectorComponent component, Endpoint endpoint,
DataType inputDataType, DataType outputDataType) {
@@ -46,8 +50,8 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat
public Producer createProducer() throws Exception {
final Producer producer = endpoint.createProducer();
- final Processor beforeProducer = getComponent().getBeforeProducer();
- final Processor afterProducer = getComponent().getAfterProducer();
+ final Processor beforeProducer = getBeforeProducer();
+ final Processor afterProducer = getAfterProducer();
// use a pipeline to process before, producer, after in that order
// create producer with the pipeline
@@ -58,8 +62,8 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat
@Override
public Consumer createConsumer(final Processor processor) throws Exception {
- final Processor beforeConsumer = getComponent().getBeforeConsumer();
- final Processor afterConsumer = getComponent().getAfterConsumer();
+ final Processor beforeConsumer = getBeforeConsumer();
+ final Processor afterConsumer = getAfterConsumer();
// use a pipeline to process before, processor, after in that order
// create consumer with the pipeline
@@ -100,6 +104,62 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat
return outputDataType;
}
+ /**
+ * Gets the processor used to perform custom processing before the producer is sending the message.
+ */
+ public Processor getBeforeProducer() {
+ return beforeProducer;
+ }
+
+ /**
+ * To perform custom processing before the producer is sending the message.
+ */
+ public void setBeforeProducer(Processor beforeProducer) {
+ this.beforeProducer = beforeProducer;
+ }
+
+ /**
+ * Gets the processor used to perform custom processing after the producer has sent the message and received any reply (if InOut).
+ */
+ public Processor getAfterProducer() {
+ return afterProducer;
+ }
+
+ /**
+ * To perform custom processing after the producer has sent the message and received any reply (if InOut).
+ */
+ public void setAfterProducer(Processor afterProducer) {
+ this.afterProducer = afterProducer;
+ }
+
+ /**
+ * Gets the processor used to perform custom processing when the consumer has just received a new incoming message.
+ */
+ public Processor getBeforeConsumer() {
+ return beforeConsumer;
+ }
+
+ /**
+ * To perform custom processing when the consumer has just received a new incoming message.
+ */
+ public void setBeforeConsumer(Processor beforeConsumer) {
+ this.beforeConsumer = beforeConsumer;
+ }
+
+ /**
+ * Gets the processor used to perform custom processing when the consumer is about to send back a reply message to the caller (if InOut).
+ */
+ public Processor getAfterConsumer() {
+ return afterConsumer;
+ }
+
+ /**
+ * To perform custom processing when the consumer is about to send back a reply message to the caller (if InOut).
+ */
+ public void setAfterConsumer(Processor afterConsumer) {
+ this.afterConsumer = afterConsumer;
+ }
+
@Override
protected void doStart() throws Exception {
super.doStart();
diff --git a/connectors/examples/foo-bar-wine-example/src/main/java/org/foo/FooBarWineRoute.java b/connectors/examples/foo-bar-wine-example/src/main/java/org/foo/FooBarWineRoute.java
index 62cd18a..b066f6d 100644
--- a/connectors/examples/foo-bar-wine-example/src/main/java/org/foo/FooBarWineRoute.java
+++ b/connectors/examples/foo-bar-wine-example/src/main/java/org/foo/FooBarWineRoute.java
@@ -26,10 +26,12 @@ public class FooBarWineRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("foo:ThirstyBear?period=2000")
+ .log("Who is this: ${header.whoami}")
.to("wine:Wine?amount=2")
.log("ThirstyBear ordered ${body}");
from("foo:Moes?period=5000")
+ .log("Who is this: ${header.whoami}")
.to("bar:Beer?amount=5&celebrity=true")
.log("Moes ordered ${body}");
}
diff --git a/connectors/examples/foo-connector/src/main/java/org/foo/connector/FooComponent.java b/connectors/examples/foo-connector/src/main/java/org/foo/connector/FooComponent.java
index 20318f5..04861f2 100644
--- a/connectors/examples/foo-connector/src/main/java/org/foo/connector/FooComponent.java
+++ b/connectors/examples/foo-connector/src/main/java/org/foo/connector/FooComponent.java
@@ -22,6 +22,9 @@ public class FooComponent extends DefaultConnectorComponent {
public FooComponent() {
super("foo", "org.foo.connector.FooComponent");
+
+ // show how you can add a fixed header
+ setBeforeConsumer(e -> e.getIn().setHeader("whoami", "I am foo"));
}
}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <co...@camel.apache.org>'].