You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/09/18 12:49:47 UTC
[2/2] git commit: CAMEL-6765: RouteAware API to allow injecting the
Route into Consumer or other services which may need it
CAMEL-6765: RouteAware API to allow injecting the Route into Consumer or other services which may need it
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3f9984b0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3f9984b0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3f9984b0
Branch: refs/heads/camel-2.12.x
Commit: 3f9984b0c60c5b475dd700b1613b7805f301b3dd
Parents: 74e9ff9
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Sep 18 12:48:50 2013 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Wed Sep 18 12:49:30 2013 +0200
----------------------------------------------------------------------
.../main/java/org/apache/camel/RouteAware.java | 39 ++++++++++
.../org/apache/camel/impl/DefaultConsumer.java | 19 ++++-
.../camel/impl/EventDrivenConsumerRoute.java | 4 +
.../org/apache/camel/impl/RouteService.java | 7 ++
.../camel/processor/RouteAwareRouteTest.java | 78 ++++++++++++++++++++
5 files changed, 146 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3f9984b0/camel-core/src/main/java/org/apache/camel/RouteAware.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/RouteAware.java b/camel-core/src/main/java/org/apache/camel/RouteAware.java
new file mode 100644
index 0000000..d871a95
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/RouteAware.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+/**
+ * An interface to represent an object which wishes to be injected with
+ * a {@link Route} such as {@link Consumer} which is the consumer for a route.
+ */
+public interface RouteAware {
+
+ /**
+ * Injects the {@link Route}
+ *
+ * @param route the route
+ */
+ void setRoute(Route route);
+
+ /**
+ * Gets the {@link Route}
+ *
+ * @return the route, or <tt>null</tt> if no route has been set.
+ */
+ Route getRoute();
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/3f9984b0/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
index 74e9555..2d82a47 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
@@ -21,6 +21,8 @@ import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.RouteAware;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.ServiceSupport;
@@ -36,12 +38,13 @@ import org.slf4j.LoggerFactory;
*
* @version
*/
-public class DefaultConsumer extends ServiceSupport implements Consumer {
+public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAware {
protected final Logger log = LoggerFactory.getLogger(getClass());
private final Endpoint endpoint;
private final Processor processor;
private volatile AsyncProcessor asyncProcessor;
private ExceptionHandler exceptionHandler;
+ private Route route;
public DefaultConsumer(Endpoint endpoint, Processor processor) {
this.endpoint = endpoint;
@@ -54,6 +57,14 @@ public class DefaultConsumer extends ServiceSupport implements Consumer {
return "Consumer[" + URISupport.sanitizeUri(endpoint.getEndpointUri()) + "]";
}
+ public Route getRoute() {
+ return route;
+ }
+
+ public void setRoute(Route route) {
+ this.route = route;
+ }
+
/**
* If the consumer needs to defer done the {@link org.apache.camel.spi.UnitOfWork} on
* the processed {@link Exchange} then this method should be use to create and start
@@ -66,6 +77,12 @@ public class DefaultConsumer extends ServiceSupport implements Consumer {
* @see #doneUoW(org.apache.camel.Exchange)
*/
public UnitOfWork createUoW(Exchange exchange) throws Exception {
+ // if the exchange doesn't have from route id set, then set it if it originated
+ // from this unit of work
+ if (route != null && exchange.getFromRouteId() == null) {
+ exchange.setFromRouteId(route.getId());
+ }
+
UnitOfWork uow = UnitOfWorkHelper.createUoW(exchange);
exchange.setUnitOfWork(uow);
uow.start();
http://git-wip-us.apache.org/repos/asf/camel/blob/3f9984b0/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java
index 4276ccc..d762a51 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/EventDrivenConsumerRoute.java
@@ -22,6 +22,7 @@ import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
+import org.apache.camel.RouteAware;
import org.apache.camel.Service;
import org.apache.camel.SuspendableService;
import org.apache.camel.spi.RouteContext;
@@ -64,6 +65,9 @@ public class EventDrivenConsumerRoute extends DefaultRoute {
consumer = endpoint.createConsumer(processor);
if (consumer != null) {
services.add(consumer);
+ if (consumer instanceof RouteAware) {
+ ((RouteAware) consumer).setRoute(this);
+ }
}
Processor processor = getProcessor();
if (processor instanceof Service) {
http://git-wip-us.apache.org/repos/asf/camel/blob/3f9984b0/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/RouteService.java b/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
index 4f8f4e7..3df2829 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/RouteService.java
@@ -30,6 +30,7 @@ import org.apache.camel.Channel;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Route;
+import org.apache.camel.RouteAware;
import org.apache.camel.Service;
import org.apache.camel.model.OnCompletionDefinition;
import org.apache.camel.model.OnExceptionDefinition;
@@ -142,6 +143,12 @@ public class RouteService extends ChildServiceSupport {
// afterwards to avoid them being active while the others start
List<Service> childServices = new ArrayList<Service>();
for (Service service : list) {
+
+ // inject the route
+ if (service instanceof RouteAware) {
+ ((RouteAware) service).setRoute(route);
+ }
+
if (service instanceof Consumer) {
inputs.put(route, (Consumer) service);
} else {
http://git-wip-us.apache.org/repos/asf/camel/blob/3f9984b0/camel-core/src/test/java/org/apache/camel/processor/RouteAwareRouteTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/RouteAwareRouteTest.java b/camel-core/src/test/java/org/apache/camel/processor/RouteAwareRouteTest.java
new file mode 100644
index 0000000..62f5995
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/RouteAwareRouteTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.RouteAware;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.ServiceSupport;
+
+public class RouteAwareRouteTest extends ContextTestSupport {
+
+ public void testRouteAware() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("foo");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").routeId("foo")
+ .process(new MyProcessor())
+ .to("mock:result");
+ }
+ };
+ }
+
+ private static final class MyProcessor extends ServiceSupport implements Processor, RouteAware {
+
+ private Route route;
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setBody(route.getId());
+ }
+
+ @Override
+ public void setRoute(Route route) {
+ this.route = route;
+ }
+
+ @Override
+ public Route getRoute() {
+ return route;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ // noop
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // noop
+ }
+ }
+}