You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/07 12:32:26 UTC
camel git commit: Camel connector allow to do custom logic before
producer or consumer does anything.
Repository: camel
Updated Branches:
refs/heads/master e8bfc8cfb -> 38c3cfa50
Camel connector allow to do custom logic before producer or consumer does anything.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/38c3cfa5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/38c3cfa5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/38c3cfa5
Branch: refs/heads/master
Commit: 38c3cfa505c75dc52ea3cc80a1723250dfd925dc
Parents: e8bfc8c
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Apr 7 14:32:07 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Apr 7 14:32:19 2017 +0200
----------------------------------------------------------------------
.../component/connector/ConnectorComponent.java | 28 +++++++
.../component/connector/ConnectorProducer.java | 82 ++++++++++++++++++++
.../connector/DefaultConnectorComponent.java | 45 +++++++++++
.../connector/DefaultConnectorEndpoint.java | 12 ++-
4 files changed, 164 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/38c3cfa5/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorComponent.java
----------------------------------------------------------------------
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorComponent.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorComponent.java
index 4a7dbce..901ed0b 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorComponent.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorComponent.java
@@ -20,6 +20,7 @@ import java.net.URISyntaxException;
import java.util.Map;
import org.apache.camel.Component;
+import org.apache.camel.Processor;
import org.apache.camel.catalog.CamelCatalog;
/**
@@ -76,4 +77,31 @@ public interface ConnectorComponent extends Component {
*/
void setComponentOptions(Map<String, Object> baseComponentOptions);
+ /**
+ * To perform custom processing before the producer is sending the message.
+ */
+ void setBeforeProducer(Processor processor);
+
+ Processor getBeforeProducer();
+
+ /**
+ * To perform custom processing after the producer has sent the message and received any reply (if InOut).
+ */
+ void setAfterProducer(Processor processor);
+
+ Processor getAfterProducer();
+
+ /**
+ * To perform custom processing when the consumer has just received a new incoming message.
+ */
+ void setBeforeConsumer(Processor processor);
+
+ Processor getBeforeConsumer();
+
+ /**
+ * To perform custom processing when the consumer is about to send back a reply message to the caller (if InOut).
+ */
+ void setAfterConsumer(Processor processor);
+
+ Processor getAfterConsumer();
}
http://git-wip-us.apache.org/repos/asf/camel/blob/38c3cfa5/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
----------------------------------------------------------------------
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
new file mode 100644
index 0000000..b0d7225
--- /dev/null
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.connector;
+
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ServiceHelper;
+
+public class ConnectorProducer extends DefaultProducer {
+
+ private final Producer producer;
+ private final Processor beforeProducer;
+ private final Processor afterProducer;
+
+ public ConnectorProducer(Endpoint endpoint, Producer producer, Processor beforeProducer, Processor afterProducer) {
+ super(endpoint);
+ this.producer = producer;
+ this.beforeProducer = beforeProducer;
+ this.afterProducer = afterProducer;
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ if (!isRunAllowed()) {
+ throw new RejectedExecutionException();
+ }
+
+ if (beforeProducer != null) {
+ beforeProducer.process(exchange);
+ }
+
+ producer.process(exchange);
+
+ if (afterProducer != null) {
+ afterProducer.process(exchange);
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ServiceHelper.startService(producer);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopService(producer);
+ }
+
+ @Override
+ protected void doSuspend() throws Exception {
+ ServiceHelper.suspendService(producer);
+ }
+
+ @Override
+ protected void doResume() throws Exception {
+ ServiceHelper.resumeService(producer);
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ ServiceHelper.stopAndShutdownService(producer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/38c3cfa5/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java
----------------------------------------------------------------------
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java
index f841ec0..4f3fd15 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.camel.Component;
import org.apache.camel.ComponentVerifier;
import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
import org.apache.camel.VerifiableComponent;
import org.apache.camel.catalog.CamelCatalog;
import org.apache.camel.catalog.DefaultCamelCatalog;
@@ -47,6 +48,10 @@ public abstract class DefaultConnectorComponent extends DefaultComponent impleme
private final String componentName;
private final ConnectorModel model;
private Map<String, Object> componentOptions;
+ private Processor beforeProducer;
+ private Processor afterProducer;
+ private Processor beforeConsumer;
+ private Processor afterConsumer;
protected DefaultConnectorComponent(String componentName, String className) {
this.componentName = componentName;
@@ -197,6 +202,46 @@ public abstract class DefaultConnectorComponent extends DefaultComponent impleme
super.doStop();
}
+ @Override
+ public Processor getBeforeProducer() {
+ return beforeProducer;
+ }
+
+ @Override
+ public void setBeforeProducer(Processor beforeProducer) {
+ this.beforeProducer = beforeProducer;
+ }
+
+ @Override
+ public Processor getAfterProducer() {
+ return afterProducer;
+ }
+
+ @Override
+ public void setAfterProducer(Processor afterProducer) {
+ this.afterProducer = afterProducer;
+ }
+
+ @Override
+ public Processor getBeforeConsumer() {
+ return beforeConsumer;
+ }
+
+ @Override
+ public void setBeforeConsumer(Processor beforeConsumer) {
+ this.beforeConsumer = beforeConsumer;
+ }
+
+ @Override
+ public Processor getAfterConsumer() {
+ return afterConsumer;
+ }
+
+ @Override
+ public void setAfterConsumer(Processor afterConsumer) {
+ this.afterConsumer = afterConsumer;
+ }
+
// ***************************************
// Helpers
// ***************************************
http://git-wip-us.apache.org/repos/asf/camel/blob/38c3cfa5/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
----------------------------------------------------------------------
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
index 6550907..ba9e93b 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.component.connector;
-import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.DelegateEndpoint;
import org.apache.camel.Endpoint;
@@ -44,12 +43,19 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat
@Override
public Producer createProducer() throws Exception {
- return endpoint.createProducer();
+ Producer producer = endpoint.createProducer();
+ return new ConnectorProducer(endpoint, producer, getComponent().getBeforeProducer(), getComponent().getAfterProducer());
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- return endpoint.createConsumer(processor);
+ Consumer answer = endpoint.createConsumer(processor);
+ return answer;
+ }
+
+ @Override
+ public ConnectorComponent getComponent() {
+ return (ConnectorComponent) super.getComponent();
}
@Override