You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/07 12:32:26 UTC

camel git commit: Camel connector allow to do custom logic before producer or consumer does anything.

Repository: camel
Updated Branches:
  refs/heads/master e8bfc8cfb -> 38c3cfa50


Camel connector allow to do custom logic before producer or consumer does anything.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/38c3cfa5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/38c3cfa5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/38c3cfa5

Branch: refs/heads/master
Commit: 38c3cfa505c75dc52ea3cc80a1723250dfd925dc
Parents: e8bfc8c
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Apr 7 14:32:07 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Apr 7 14:32:19 2017 +0200

----------------------------------------------------------------------
 .../component/connector/ConnectorComponent.java | 28 +++++++
 .../component/connector/ConnectorProducer.java  | 82 ++++++++++++++++++++
 .../connector/DefaultConnectorComponent.java    | 45 +++++++++++
 .../connector/DefaultConnectorEndpoint.java     | 12 ++-
 4 files changed, 164 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/38c3cfa5/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorComponent.java
----------------------------------------------------------------------
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorComponent.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorComponent.java
index 4a7dbce..901ed0b 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorComponent.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorComponent.java
@@ -20,6 +20,7 @@ import java.net.URISyntaxException;
 import java.util.Map;
 
 import org.apache.camel.Component;
+import org.apache.camel.Processor;
 import org.apache.camel.catalog.CamelCatalog;
 
 /**
@@ -76,4 +77,31 @@ public interface ConnectorComponent extends Component {
      */
     void setComponentOptions(Map<String, Object> baseComponentOptions);
 
+    /**
+     * To perform custom processing before the producer is sending the message.
+     */
+    void setBeforeProducer(Processor processor);
+
+    Processor getBeforeProducer();
+
+    /**
+     * To perform custom processing after the producer has sent the message and received any reply (if InOut).
+     */
+    void setAfterProducer(Processor processor);
+
+    Processor getAfterProducer();
+
+    /**
+     * To perform custom processing when the consumer has just received a new incoming message.
+     */
+    void setBeforeConsumer(Processor processor);
+
+    Processor getBeforeConsumer();
+
+    /**
+     * To perform custom processing when the consumer is about to send back a reply message to the caller (if InOut).
+     */
+    void setAfterConsumer(Processor processor);
+
+    Processor getAfterConsumer();
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/38c3cfa5/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
----------------------------------------------------------------------
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
new file mode 100644
index 0000000..b0d7225
--- /dev/null
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.connector;
+
+import java.util.concurrent.RejectedExecutionException;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.util.ServiceHelper;
+
+public class ConnectorProducer extends DefaultProducer {
+
+    private final Producer producer;
+    private final Processor beforeProducer;
+    private final Processor afterProducer;
+
+    public ConnectorProducer(Endpoint endpoint, Producer producer, Processor beforeProducer, Processor afterProducer) {
+        super(endpoint);
+        this.producer = producer;
+        this.beforeProducer = beforeProducer;
+        this.afterProducer = afterProducer;
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        if (!isRunAllowed()) {
+            throw new RejectedExecutionException();
+        }
+
+        if (beforeProducer != null) {
+            beforeProducer.process(exchange);
+        }
+
+        producer.process(exchange);
+
+        if (afterProducer != null) {
+            afterProducer.process(exchange);
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ServiceHelper.startService(producer);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(producer);
+    }
+
+    @Override
+    protected void doSuspend() throws Exception {
+        ServiceHelper.suspendService(producer);
+    }
+
+    @Override
+    protected void doResume() throws Exception {
+        ServiceHelper.resumeService(producer);
+    }
+
+    @Override
+    protected void doShutdown() throws Exception {
+        ServiceHelper.stopAndShutdownService(producer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/38c3cfa5/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java
----------------------------------------------------------------------
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java
index f841ec0..4f3fd15 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorComponent.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.camel.Component;
 import org.apache.camel.ComponentVerifier;
 import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
 import org.apache.camel.VerifiableComponent;
 import org.apache.camel.catalog.CamelCatalog;
 import org.apache.camel.catalog.DefaultCamelCatalog;
@@ -47,6 +48,10 @@ public abstract class DefaultConnectorComponent extends DefaultComponent impleme
     private final String componentName;
     private final ConnectorModel model;
     private Map<String, Object> componentOptions;
+    private Processor beforeProducer;
+    private Processor afterProducer;
+    private Processor beforeConsumer;
+    private Processor afterConsumer;
 
     protected DefaultConnectorComponent(String componentName, String className) {
         this.componentName = componentName;
@@ -197,6 +202,46 @@ public abstract class DefaultConnectorComponent extends DefaultComponent impleme
         super.doStop();
     }
 
+    @Override
+    public Processor getBeforeProducer() {
+        return beforeProducer;
+    }
+
+    @Override
+    public void setBeforeProducer(Processor beforeProducer) {
+        this.beforeProducer = beforeProducer;
+    }
+
+    @Override
+    public Processor getAfterProducer() {
+        return afterProducer;
+    }
+
+    @Override
+    public void setAfterProducer(Processor afterProducer) {
+        this.afterProducer = afterProducer;
+    }
+
+    @Override
+    public Processor getBeforeConsumer() {
+        return beforeConsumer;
+    }
+
+    @Override
+    public void setBeforeConsumer(Processor beforeConsumer) {
+        this.beforeConsumer = beforeConsumer;
+    }
+
+    @Override
+    public Processor getAfterConsumer() {
+        return afterConsumer;
+    }
+
+    @Override
+    public void setAfterConsumer(Processor afterConsumer) {
+        this.afterConsumer = afterConsumer;
+    }
+
     // ***************************************
     // Helpers
     // ***************************************

http://git-wip-us.apache.org/repos/asf/camel/blob/38c3cfa5/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
----------------------------------------------------------------------
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
index 6550907..ba9e93b 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.component.connector;
 
-import org.apache.camel.Component;
 import org.apache.camel.Consumer;
 import org.apache.camel.DelegateEndpoint;
 import org.apache.camel.Endpoint;
@@ -44,12 +43,19 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat
 
     @Override
     public Producer createProducer() throws Exception {
-        return endpoint.createProducer();
+        Producer producer = endpoint.createProducer();
+        return new ConnectorProducer(endpoint, producer, getComponent().getBeforeProducer(), getComponent().getAfterProducer());
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        return endpoint.createConsumer(processor);
+        Consumer answer = endpoint.createConsumer(processor);
+        return answer;
+    }
+
+    @Override
+    public ConnectorComponent getComponent() {
+        return (ConnectorComponent) super.getComponent();
     }
 
     @Override