You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/10/06 08:14:37 UTC
camel git commit: camel-connector before/after should support async
routing engine via pipeline which also is simpler code.
Repository: camel
Updated Branches:
refs/heads/connector-async [created] 5b1b92669
camel-connector before/after should support async routing engine via pipeline which also is simpler code.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5b1b9266
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5b1b9266
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5b1b9266
Branch: refs/heads/connector-async
Commit: 5b1b92669ee238b3a1e2c80545ed88e9464f1298
Parents: a12916c
Author: Claus Ibsen <da...@apache.org>
Authored: Fri Oct 6 10:14:28 2017 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Oct 6 10:14:28 2017 +0200
----------------------------------------------------------------------
.../connector/ConnectorConsumerProcessor.java | 94 --------------------
.../component/connector/ConnectorProducer.java | 50 +++--------
.../connector/DefaultConnectorEndpoint.java | 34 ++++++-
3 files changed, 42 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/5b1b9266/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
----------------------------------------------------------------------
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
deleted file mode 100644
index 53c3860..0000000
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorConsumerProcessor.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.connector;
-
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.processor.DelegateAsyncProcessor;
-import org.apache.camel.util.ServiceHelper;
-
-/**
- * Connector {@link Processor} which is capable of performing before and after custom processing
- * while consuming a message (ie from the consumer).
- */
-public class ConnectorConsumerProcessor extends DelegateAsyncProcessor {
-
- private final Processor beforeConsumer;
- private final Processor afterConsumer;
-
- public ConnectorConsumerProcessor(Processor processor, Processor beforeConsumer, Processor afterConsumer) {
- super(processor);
- this.beforeConsumer = beforeConsumer;
- this.afterConsumer = afterConsumer;
- }
-
- @Override
- public boolean process(Exchange exchange, final AsyncCallback callback) {
- // setup callback for after consumer
- AsyncCallback delegate = doneSync -> {
- if (afterConsumer != null) {
- try {
- afterConsumer.process(exchange);
- } catch (Throwable e) {
- exchange.setException(e);
- }
- }
- callback.done(doneSync);
- };
-
- // perform any before consumer
- if (beforeConsumer != null) {
- try {
- beforeConsumer.process(exchange);
- } catch (Throwable e) {
- exchange.setException(e);
- callback.done(true);
- return true;
- }
- }
-
- // process the consumer
- return super.process(exchange, delegate);
- }
-
- @Override
- protected void doStart() throws Exception {
- ServiceHelper.startServices(beforeConsumer, processor, afterConsumer);
- }
-
- @Override
- protected void doStop() throws Exception {
- ServiceHelper.stopServices(beforeConsumer, processor, afterConsumer);
- }
-
- @Override
- protected void doSuspend() throws Exception {
- ServiceHelper.suspendService(processor);
- }
-
- @Override
- protected void doResume() throws Exception {
- ServiceHelper.resumeService(processor);
- }
-
- @Override
- protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(beforeConsumer, processor, afterConsumer);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/5b1b9266/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
----------------------------------------------------------------------
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
index b5db55d..33f64c5 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/ConnectorProducer.java
@@ -20,79 +20,51 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultAsyncProducer;
-import org.apache.camel.util.AsyncProcessorConverterHelper;
+import org.apache.camel.processor.Pipeline;
import org.apache.camel.util.ServiceHelper;
/**
* Connector {@link Producer} which is capable of performing before and after custom processing
- * while processing (ie sending the message).
+ * via the {@link Pipeline }while processing (ie sending the message).
*/
public class ConnectorProducer extends DefaultAsyncProducer {
- private final AsyncProcessor producer;
- private final Processor beforeProducer;
- private final Processor afterProducer;
+ private final AsyncProcessor processor;
- public ConnectorProducer(Endpoint endpoint, Producer producer, Processor beforeProducer, Processor afterProducer) {
+ public ConnectorProducer(Endpoint endpoint, Pipeline processor) {
super(endpoint);
- this.producer = AsyncProcessorConverterHelper.convert(producer);
- this.beforeProducer = beforeProducer;
- this.afterProducer = afterProducer;
+ this.processor = processor;
}
@Override
public boolean process(Exchange exchange, final AsyncCallback callback) {
- // setup callback for after producer
- AsyncCallback delegate = doneSync -> {
- if (afterProducer != null) {
- try {
- afterProducer.process(exchange);
- } catch (Throwable e) {
- exchange.setException(e);
- }
- }
- callback.done(doneSync);
- };
-
- // perform any before producer
- if (beforeProducer != null) {
- try {
- beforeProducer.process(exchange);
- } catch (Throwable e) {
- exchange.setException(e);
- callback.done(true);
- return true;
- }
- }
-
- return producer.process(exchange, delegate);
+ return processor.process(exchange, callback);
}
@Override
protected void doStart() throws Exception {
- ServiceHelper.startServices(beforeProducer, producer, afterProducer);
+ ServiceHelper.startServices(processor);
}
@Override
protected void doStop() throws Exception {
- ServiceHelper.stopServices(beforeProducer, producer, afterProducer);
+ ServiceHelper.stopServices(processor);
}
@Override
protected void doSuspend() throws Exception {
- ServiceHelper.suspendService(producer);
+ ServiceHelper.suspendService(processor);
}
@Override
protected void doResume() throws Exception {
- ServiceHelper.resumeService(producer);
+ ServiceHelper.resumeService(processor);
}
@Override
protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(beforeProducer, producer, afterProducer);
+ ServiceHelper.stopAndShutdownServices(processor);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/5b1b9266/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
----------------------------------------------------------------------
diff --git a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
index cb254ce..c8158ad 100644
--- a/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
+++ b/connectors/camel-connector/src/main/java/org/apache/camel/component/connector/DefaultConnectorEndpoint.java
@@ -16,6 +16,9 @@
*/
package org.apache.camel.component.connector;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.camel.Consumer;
import org.apache.camel.DelegateEndpoint;
import org.apache.camel.Endpoint;
@@ -24,6 +27,7 @@ import org.apache.camel.Producer;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.processor.Pipeline;
import org.apache.camel.util.ServiceHelper;
@ManagedResource(description = "Managed Connector Endpoint")
@@ -44,13 +48,37 @@ public class DefaultConnectorEndpoint extends DefaultEndpoint implements Delegat
@Override
public Producer createProducer() throws Exception {
Producer producer = endpoint.createProducer();
- return new ConnectorProducer(endpoint, producer, getComponent().getBeforeProducer(), getComponent().getAfterProducer());
+
+ // use a pipeline to process before, producer, after in that order
+ List<Processor> list = new ArrayList<>();
+ if (getComponent().getBeforeProducer() != null) {
+ list.add(getComponent().getBeforeProducer());
+ }
+ list.add(producer);
+ if (getComponent().getAfterConsumer() != null) {
+ list.add(getComponent().getAfterProducer());
+ }
+
+ // create producer with the pipeline
+ Pipeline pipeline = new Pipeline(getCamelContext(), list);
+ return new ConnectorProducer(endpoint, pipeline);
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- ConnectorConsumerProcessor delegate = new ConnectorConsumerProcessor(processor, getComponent().getBeforeConsumer(), getComponent().getAfterConsumer());
- Consumer consumer = endpoint.createConsumer(delegate);
+ // use a pipeline to process before, processor, after in that order
+ List<Processor> list = new ArrayList<>();
+ if (getComponent().getBeforeConsumer() != null) {
+ list.add(getComponent().getBeforeConsumer());
+ }
+ list.add(processor);
+ if (getComponent().getAfterConsumer() != null) {
+ list.add(getComponent().getAfterConsumer());
+ }
+
+ // create consumer with the pipeline
+ Pipeline pipeline = new Pipeline(getCamelContext(), list);
+ Consumer consumer = endpoint.createConsumer(pipeline);
configureConsumer(consumer);
return consumer;
}