You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/12/29 17:54:30 UTC
[camel] 03/03: CAMEL-14338: Add RouteIdAware so EIP processors can
know which route they are serving
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4d532a846ac616572b145e899b28dad2f503da00
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Dec 29 18:25:54 2019 +0100
CAMEL-14338: Add RouteIdAware so EIP processors can know which route they are serving
---
.../apache/camel/impl/engine/BaseRouteService.java | 4 +
.../impl/engine/EventDrivenConsumerRoute.java | 4 +
.../camel/processor/ConsumerRouteIdAwareTest.java | 106 +++++++++++++++++++++
.../camel/processor/RouteAwareProcessorTest.java | 94 ++++++++++++++++++
.../org/apache/camel/support/DefaultConsumer.java | 14 ++-
5 files changed, 221 insertions(+), 1 deletion(-)
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java
index 33b72d4..51b99b4 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java
@@ -40,6 +40,7 @@ import org.apache.camel.Service;
import org.apache.camel.processor.ErrorHandler;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.support.ChildServiceSupport;
import org.apache.camel.support.EventHelper;
@@ -171,6 +172,9 @@ public abstract class BaseRouteService extends ChildServiceSupport {
if (service instanceof RouteAware) {
((RouteAware) service).setRoute(route);
}
+ if (service instanceof RouteIdAware) {
+ ((RouteIdAware) service).setRouteId(route.getId());
+ }
if (service instanceof Consumer) {
inputs.put(route, (Consumer) service);
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EventDrivenConsumerRoute.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/EventDrivenConsumerRoute.java
index 2b87cfd..aacf546 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EventDrivenConsumerRoute.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/EventDrivenConsumerRoute.java
@@ -29,6 +29,7 @@ import org.apache.camel.Suspendable;
import org.apache.camel.SuspendableService;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.PatternHelper;
/**
@@ -71,6 +72,9 @@ public class EventDrivenConsumerRoute extends DefaultRoute {
if (consumer instanceof RouteAware) {
((RouteAware) consumer).setRoute(this);
}
+ if (consumer instanceof RouteIdAware) {
+ ((RouteIdAware) consumer).setRouteId(this.getId());
+ }
}
Processor processor = getProcessor();
if (processor instanceof Service) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ConsumerRouteIdAwareTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ConsumerRouteIdAwareTest.java
new file mode 100644
index 0000000..31840b7
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/ConsumerRouteIdAwareTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.DefaultEndpoint;
+import org.junit.Test;
+
+public class ConsumerRouteIdAwareTest extends ContextTestSupport {
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("my", new MyComponent(context));
+
+ from("my:foo").routeId("foo").to("mock:result");
+ }
+ };
+ }
+
+ @Test
+ public void testRouteIdAware() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello from consumer route foo");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ private class MyComponent extends DefaultComponent {
+
+ public MyComponent(CamelContext context) {
+ super(context);
+ }
+
+ @Override
+ protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ return new MyEndpoint(uri, this);
+ }
+ }
+
+ private class MyEndpoint extends DefaultEndpoint {
+
+ public MyEndpoint(String endpointUri, Component component) {
+ super(endpointUri, component);
+ }
+
+ @Override
+ public Producer createProducer() throws Exception {
+ throw new UnsupportedOperationException("Not supported");
+ }
+
+ @Override
+ public Consumer createConsumer(Processor processor) throws Exception {
+ return new MyConsumer(this, processor);
+ }
+ }
+
+ private class MyConsumer extends DefaultConsumer {
+
+ public MyConsumer(Endpoint endpoint, Processor processor) {
+ super(endpoint, processor);
+
+ Runnable run = () -> {
+ Exchange exchange = endpoint.createExchange();
+ exchange.getMessage().setBody("Hello from consumer route " + getRouteId());
+ try {
+ Thread.sleep(100);
+ processor.process(exchange);
+ } catch (Exception e) {
+ // ignore
+ }
+ };
+ Thread t = new Thread(run);
+ t.start();
+ }
+
+ }
+
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareProcessorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareProcessorTest.java
new file mode 100644
index 0000000..8c553b7
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareProcessorTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.support.service.ServiceSupport;
+import org.junit.Test;
+
+public class RouteAwareProcessorTest extends ContextTestSupport {
+
+ private MyProcessor processor = new MyProcessor();
+
+ @Test
+ public void testRouteIdAware() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello route foo from processor myProcessor");
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start").routeId("foo")
+ .process(processor).id("myProcessor")
+ .to("mock:result");
+ }
+ };
+ }
+
+ private class MyProcessor extends ServiceSupport implements Processor, RouteIdAware, IdAware {
+
+ private String id;
+ private String routeId;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getMessage().setBody("Hello route " + routeId + " from processor " + id);
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ // noop
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // noop
+ }
+ }
+
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index beb5870..e263c9a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -25,6 +25,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.RouteAware;
import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
@@ -33,7 +34,7 @@ import org.apache.camel.util.URISupport;
/**
* A default consumer useful for implementation inheritance.
*/
-public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAware {
+public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAware, RouteIdAware {
private transient String consumerToString;
private final Endpoint endpoint;
@@ -41,6 +42,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
private volatile AsyncProcessor asyncProcessor;
private ExceptionHandler exceptionHandler;
private Route route;
+ private String routeId;
public DefaultConsumer(Endpoint endpoint, Processor processor) {
this.endpoint = endpoint;
@@ -66,6 +68,16 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
this.route = route;
}
+ @Override
+ public String getRouteId() {
+ return routeId;
+ }
+
+ @Override
+ public void setRouteId(String routeId) {
+ this.routeId = routeId;
+ }
+
/**
* If the consumer needs to defer done the {@link org.apache.camel.spi.UnitOfWork} on
* the processed {@link Exchange} then this method should be use to create and start