You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/12/29 17:54:30 UTC

[camel] 03/03: CAMEL-14338: Add RouteIdAware so EIP processors can know which route they are serving

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4d532a846ac616572b145e899b28dad2f503da00
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Dec 29 18:25:54 2019 +0100

    CAMEL-14338: Add RouteIdAware so EIP processors can know which route they are serving
---
 .../apache/camel/impl/engine/BaseRouteService.java |   4 +
 .../impl/engine/EventDrivenConsumerRoute.java      |   4 +
 .../camel/processor/ConsumerRouteIdAwareTest.java  | 106 +++++++++++++++++++++
 .../camel/processor/RouteAwareProcessorTest.java   |  94 ++++++++++++++++++
 .../org/apache/camel/support/DefaultConsumer.java  |  14 ++-
 5 files changed, 221 insertions(+), 1 deletion(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java
index 33b72d4..51b99b4 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/BaseRouteService.java
@@ -40,6 +40,7 @@ import org.apache.camel.Service;
 import org.apache.camel.processor.ErrorHandler;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.support.ChildServiceSupport;
 import org.apache.camel.support.EventHelper;
@@ -171,6 +172,9 @@ public abstract class BaseRouteService extends ChildServiceSupport {
                     if (service instanceof RouteAware) {
                         ((RouteAware) service).setRoute(route);
                     }
+                    if (service instanceof RouteIdAware) {
+                        ((RouteIdAware) service).setRouteId(route.getId());
+                    }
 
                     if (service instanceof Consumer) {
                         inputs.put(route, (Consumer) service);
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EventDrivenConsumerRoute.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/EventDrivenConsumerRoute.java
index 2b87cfd..aacf546 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/EventDrivenConsumerRoute.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/EventDrivenConsumerRoute.java
@@ -29,6 +29,7 @@ import org.apache.camel.Suspendable;
 import org.apache.camel.SuspendableService;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.support.PatternHelper;
 
 /**
@@ -71,6 +72,9 @@ public class EventDrivenConsumerRoute extends DefaultRoute {
             if (consumer instanceof RouteAware) {
                 ((RouteAware) consumer).setRoute(this);
             }
+            if (consumer instanceof RouteIdAware) {
+                ((RouteIdAware) consumer).setRouteId(this.getId());
+            }
         }
         Processor processor = getProcessor();
         if (processor instanceof Service) {
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ConsumerRouteIdAwareTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ConsumerRouteIdAwareTest.java
new file mode 100644
index 0000000..31840b7
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/ConsumerRouteIdAwareTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.support.DefaultEndpoint;
+import org.junit.Test;
+
+public class ConsumerRouteIdAwareTest extends ContextTestSupport {
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.addComponent("my", new MyComponent(context));
+
+                from("my:foo").routeId("foo").to("mock:result");
+            }
+        };
+    }
+
+    @Test
+    public void testRouteIdAware() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello from consumer route foo");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    private class MyComponent extends DefaultComponent {
+
+        public MyComponent(CamelContext context) {
+            super(context);
+        }
+
+        @Override
+        protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+            return new MyEndpoint(uri, this);
+        }
+    }
+
+    private class MyEndpoint extends DefaultEndpoint {
+
+        public MyEndpoint(String endpointUri, Component component) {
+            super(endpointUri, component);
+        }
+
+        @Override
+        public Producer createProducer() throws Exception {
+            throw new UnsupportedOperationException("Not supported");
+        }
+
+        @Override
+        public Consumer createConsumer(Processor processor) throws Exception {
+            return new MyConsumer(this, processor);
+        }
+    }
+
+    private class MyConsumer extends DefaultConsumer {
+
+        public MyConsumer(Endpoint endpoint, Processor processor) {
+            super(endpoint, processor);
+
+            Runnable run = () -> {
+                Exchange exchange = endpoint.createExchange();
+                exchange.getMessage().setBody("Hello from consumer route " + getRouteId());
+                try {
+                    Thread.sleep(100);
+                    processor.process(exchange);
+                } catch (Exception e) {
+                    // ignore
+                }
+            };
+            Thread t = new Thread(run);
+            t.start();
+        }
+
+    }
+
+}
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareProcessorTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareProcessorTest.java
new file mode 100644
index 0000000..8c553b7
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/RouteAwareProcessorTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.support.service.ServiceSupport;
+import org.junit.Test;
+
+public class RouteAwareProcessorTest extends ContextTestSupport {
+
+    private MyProcessor processor = new MyProcessor();
+
+    @Test
+    public void testRouteIdAware() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello route foo from processor myProcessor");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").routeId("foo")
+                    .process(processor).id("myProcessor")
+                    .to("mock:result");
+            }
+        };
+    }
+
+    private class MyProcessor extends ServiceSupport implements Processor, RouteIdAware, IdAware {
+
+        private String id;
+        private String routeId;
+
+        @Override
+        public String getId() {
+            return id;
+        }
+
+        @Override
+        public void setId(String id) {
+            this.id = id;
+        }
+
+        @Override
+        public String getRouteId() {
+            return routeId;
+        }
+
+        @Override
+        public void setRouteId(String routeId) {
+            this.routeId = routeId;
+        }
+
+        @Override
+        public void process(Exchange exchange) throws Exception {
+            exchange.getMessage().setBody("Hello route " + routeId + " from processor " + id);
+        }
+
+        @Override
+        protected void doStart() throws Exception {
+            // noop
+        }
+
+        @Override
+        protected void doStop() throws Exception {
+            // noop
+        }
+    }
+
+}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index beb5870..e263c9a 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -25,6 +25,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.RouteAware;
 import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.RouteIdAware;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.service.ServiceHelper;
 import org.apache.camel.support.service.ServiceSupport;
@@ -33,7 +34,7 @@ import org.apache.camel.util.URISupport;
 /**
  * A default consumer useful for implementation inheritance.
  */
-public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAware {
+public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAware, RouteIdAware {
 
     private transient String consumerToString;
     private final Endpoint endpoint;
@@ -41,6 +42,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
     private volatile AsyncProcessor asyncProcessor;
     private ExceptionHandler exceptionHandler;
     private Route route;
+    private String routeId;
 
     public DefaultConsumer(Endpoint endpoint, Processor processor) {
         this.endpoint = endpoint;
@@ -66,6 +68,16 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw
         this.route = route;
     }
 
+    @Override
+    public String getRouteId() {
+        return routeId;
+    }
+
+    @Override
+    public void setRouteId(String routeId) {
+        this.routeId = routeId;
+    }
+
     /**
      * If the consumer needs to defer done the {@link org.apache.camel.spi.UnitOfWork} on
      * the processed {@link Exchange} then this method should be use to create and start