You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ra...@apache.org on 2016/03/30 21:05:23 UTC

camel git commit: CAMEL-9765: Direct-VM - Header filter strategy & property propagation flag.

Repository: camel
Updated Branches:
  refs/heads/master fd89a88cf -> 80957b2d3


CAMEL-9765: Direct-VM - Header filter strategy & property propagation flag.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/80957b2d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/80957b2d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/80957b2d

Branch: refs/heads/master
Commit: 80957b2d3d9c1d2ebf4c51630f76cf4828f37170
Parents: fd89a88
Author: Raúl Kripalani <ra...@apache.org>
Authored: Fri Mar 25 21:07:40 2016 +0000
Committer: Raúl Kripalani <ra...@apache.org>
Committed: Wed Mar 30 20:01:36 2016 +0100

----------------------------------------------------------------------
 .../component/directvm/DirectVmComponent.java   | 32 ++++++++
 .../component/directvm/DirectVmConsumer.java    |  1 -
 .../component/directvm/DirectVmEndpoint.java    | 33 ++++++++-
 .../component/directvm/DirectVmProducer.java    | 54 +++++++++-----
 .../DirectVmHeaderFilterStrategyTest.java       | 77 ++++++++++++++++++++
 ...ectVmNoPropertyPropagationComponentTest.java | 62 ++++++++++++++++
 .../DirectVmNoPropertyPropagationTest.java      | 61 ++++++++++++++++
 7 files changed, 299 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/80957b2d/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java
index 3e48c7c..d5c341f 100644
--- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java
+++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmComponent.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.UriEndpointComponent;
+import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.Metadata;
 
 /**
@@ -41,6 +42,9 @@ public class DirectVmComponent extends UriEndpointComponent {
     private boolean block;
     @Metadata(defaultValue = "30000")
     private long timeout = 30000L;
+    private HeaderFilterStrategy headerFilterStrategy;
+    @Metadata(defaultValue = "true")
+    private Boolean propagateProperties = Boolean.TRUE;
 
     public DirectVmComponent() {
         super(DirectVmEndpoint.class);
@@ -65,6 +69,7 @@ public class DirectVmComponent extends UriEndpointComponent {
         answer.setBlock(block);
         answer.setTimeout(timeout);
         answer.configureProperties(parameters);
+        setProperties(answer, parameters);
         return answer;
     }
 
@@ -132,4 +137,31 @@ public class DirectVmComponent extends UriEndpointComponent {
     public void setTimeout(long timeout) {
         this.timeout = timeout;
     }
+
+    public HeaderFilterStrategy getHeaderFilterStrategy() {
+        return headerFilterStrategy;
+    }
+
+    /**
+     * Sets a {@link HeaderFilterStrategy} that will only be applied on producer endpoints (on both directions: request and response).
+     * <p>Default value: none.</p>
+     * @param headerFilterStrategy
+     */
+    public void setHeaderFilterStrategy(HeaderFilterStrategy headerFilterStrategy) {
+        this.headerFilterStrategy = headerFilterStrategy;
+    }
+
+    public boolean isPropagateProperties() {
+        return propagateProperties;
+    }
+
+    /**
+     * Whether to propagate or not properties from the producer side to the consumer side, and viceversa.
+     * <p>Default value: true.</p>
+     * @param propagateProperties
+     */
+    public void setPropagateProperties(boolean propagateProperties) {
+        this.propagateProperties = propagateProperties;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/80957b2d/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java
index 4b18fe3..827e975 100644
--- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmConsumer.java
@@ -18,7 +18,6 @@ package org.apache.camel.component.directvm;
 
 import org.apache.camel.Processor;
 import org.apache.camel.Suspendable;
-import org.apache.camel.SuspendableService;
 import org.apache.camel.impl.DefaultConsumer;
 
 /**

http://git-wip-us.apache.org/repos/asf/camel/blob/80957b2d/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
index db5d1c3..eeb3bdf 100644
--- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmEndpoint.java
@@ -21,6 +21,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.direct.DirectConsumer;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
@@ -43,6 +44,10 @@ public class DirectVmEndpoint extends DefaultEndpoint {
     private long timeout = 30000L;
     @UriParam(label = "producer")
     private boolean failIfNoConsumers = true;
+    @UriParam(label = "headerFilterStrategy")
+    private HeaderFilterStrategy headerFilterStrategy;
+    @UriParam(label = "propagateProperties", defaultValue = "false")
+    private Boolean propagateProperties;
 
     public DirectVmEndpoint(String endpointUri, DirectVmComponent component) {
         super(endpointUri, component);
@@ -106,10 +111,36 @@ public class DirectVmEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * Whether the producer should fail by throwing an exception, when sending to a DIRECT-VM endpoint with no active consumers.
+     * Whether the producer should fail by throwing an exception, when sending to a Direct-VM endpoint with no active consumers.
      */
     public void setFailIfNoConsumers(boolean failIfNoConsumers) {
         this.failIfNoConsumers = failIfNoConsumers;
     }
 
+    public HeaderFilterStrategy getHeaderFilterStrategy() {
+        return headerFilterStrategy == null ? getComponent().getHeaderFilterStrategy() : headerFilterStrategy;
+    }
+
+    /**
+     * Sets a {@link HeaderFilterStrategy} that will only be applied on producer endpoints (on both directions: request and response).
+     * <p>Default value: none.</p>
+     * @param headerFilterStrategy
+     */
+    public void setHeaderFilterStrategy(HeaderFilterStrategy headerFilterStrategy) {
+        this.headerFilterStrategy = headerFilterStrategy;
+    }
+
+    public Boolean isPropagateProperties() {
+        return propagateProperties == null ? getComponent().isPropagateProperties() : propagateProperties;
+    }
+
+    /**
+     * Whether to propagate or not properties from the producer side to the consumer side, and viceversa.
+     * <p>Default value: true.</p>
+     * @param propagateProperties
+     */
+    public void setPropagateProperties(Boolean propagateProperties) {
+        this.propagateProperties = propagateProperties;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/80957b2d/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
index 32fb395..d93d849 100644
--- a/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
+++ b/camel-core/src/main/java/org/apache/camel/component/directvm/DirectVmProducer.java
@@ -19,9 +19,10 @@ package org.apache.camel.component.directvm;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.spi.HeaderFilterStrategy;
 
 /**
- * The direct-vm producer
+ * The Direct-VM producer.
  */
 public class DirectVmProducer extends DefaultAsyncProducer {
 
@@ -33,24 +34,10 @@ public class DirectVmProducer extends DefaultAsyncProducer {
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
-        // send to consumer
-        DirectVmConsumer consumer = endpoint.getComponent().getConsumer(endpoint);
-        if (consumer == null) {
-            if (endpoint.isFailIfNoConsumers()) {
-                throw new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange);
-            } else {
-                log.debug("message ignored, no consumers available on endpoint: " + endpoint);
-            }
-        } else {
-            consumer.getProcessor().process(exchange);
-        }
-    }
-
-    @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
         // send to consumer
         DirectVmConsumer consumer = endpoint.getComponent().getConsumer(endpoint);
+        
         if (consumer == null) {
             if (endpoint.isFailIfNoConsumers()) {
                 exchange.setException(new DirectVmConsumerNotAvailableException("No consumers available on endpoint: " + endpoint, exchange));
@@ -59,8 +46,37 @@ public class DirectVmProducer extends DefaultAsyncProducer {
             }
             callback.done(true);
             return true;
-        } else {
-            return endpoint.getConsumer().getAsyncProcessor().process(exchange, callback);
         }
+        
+        final HeaderFilterStrategy headerFilterStrategy = endpoint.getHeaderFilterStrategy();
+
+        // Only clone the Exchange if we actually need to filter out properties or headers.
+        final Exchange submitted = (!endpoint.isPropagateProperties() || headerFilterStrategy != null) ? exchange.copy(true) : exchange;
+        
+        // Clear properties in the copy if we are not propagating them.
+        if (!endpoint.isPropagateProperties()) {
+            submitted.getProperties().clear();
+        }
+        
+        // Filter headers by Header Filter Strategy if there is one set.
+        if (headerFilterStrategy != null) {
+            submitted.getIn().getHeaders().entrySet().removeIf(e -> headerFilterStrategy.applyFilterToCamelHeaders(e.getKey(), e.getValue(), submitted));
+        }
+        
+        return consumer.getAsyncProcessor().process(submitted, done -> {
+                exchange.setException(submitted.getException());
+                exchange.getOut().copyFrom(submitted.hasOut() ? submitted.getOut() : submitted.getIn());
+            
+                if (headerFilterStrategy != null) {
+                    exchange.getOut().getHeaders().entrySet().removeIf(e -> headerFilterStrategy.applyFilterToExternalHeaders(e.getKey(), e.getValue(), submitted));
+                }
+                
+                if (endpoint.isPropagateProperties()) {
+                    exchange.getProperties().putAll(submitted.getProperties());
+                }
+                
+                callback.done(done);
+            });
     }
-}
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/80957b2d/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmHeaderFilterStrategyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmHeaderFilterStrategyTest.java b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmHeaderFilterStrategyTest.java
new file mode 100644
index 0000000..40315d1
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmHeaderFilterStrategyTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.directvm;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.spi.HeaderFilterStrategy;
+
+/**
+ *
+ */
+public class DirectVmHeaderFilterStrategyTest extends ContextTestSupport {
+
+    public void testPropertiesPropagatedOrNot() throws Exception {
+        context.getRegistry(JndiRegistry.class).bind("headerFilterStrategy", new HeaderFilterStrategy() {
+            @Override
+            public boolean applyFilterToExternalHeaders(String headerName, Object headerValue, Exchange exchange) {
+                return headerName.equals("Header2");
+            }
+            
+            @Override
+            public boolean applyFilterToCamelHeaders(String headerName, Object headerValue, Exchange exchange) {
+                return headerName.equals("Header1");
+            }
+        });
+        
+        Exchange response = template.request("direct-vm:start.filter?headerFilterStrategy=#headerFilterStrategy", exchange -> {
+                exchange.getIn().setBody("Hello World");
+                exchange.getIn().setHeader("Header1", "Value1");
+            });
+        
+        assertNull(response.getException());
+        assertNull(response.getOut().getHeader("Header2"));
+        
+        response = template.request("direct-vm:start.nofilter", exchange -> {
+                exchange.getIn().setBody("Hello World");
+                exchange.getIn().setHeader("Header1", "Value1");
+            });
+        
+        assertNull(response.getException());
+        assertEquals("Value2", response.getOut().getHeader("Header2", String.class));
+        
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("direct-vm:start.filter").process(exchange -> {
+                        assertNull(exchange.getIn().getHeader("Header1"));
+                        exchange.getIn().setHeader("Header2", "Value2");
+                    });
+                
+                from("direct-vm:start.nofilter").process(exchange -> {
+                        assertEquals("Value1", exchange.getIn().getHeader("Header1"));
+                        exchange.getIn().setHeader("Header2", "Value2");
+                    });
+            }
+        };
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/80957b2d/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationComponentTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationComponentTest.java b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationComponentTest.java
new file mode 100644
index 0000000..23b2598
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationComponentTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.directvm;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class DirectVmNoPropertyPropagationComponentTest extends ContextTestSupport {
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        
+        DirectVmComponent directvm = new DirectVmComponent();
+        directvm.setPropagateProperties(false);
+        context.addComponent("direct-vm", directvm);
+        
+        return context;
+    }
+
+    public void testPropertiesPropagatedOrNot() throws Exception {
+
+            
+        template.sendBody("direct-vm:start.default", "Hello World");
+    }
+    
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // Starters.
+                from("direct-vm:start.default")
+                    .setProperty("abc", constant("def"))
+                    .to("direct-vm:foo.noprops");
+                
+                // Asserters.
+                from("direct-vm:foo.noprops").process(exchange -> 
+                    assertNull(exchange.getProperty("abc"))
+                );
+                
+            }
+        };
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/80957b2d/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationTest.java b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationTest.java
new file mode 100644
index 0000000..6a94bba
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/directvm/DirectVmNoPropertyPropagationTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.directvm;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class DirectVmNoPropertyPropagationTest extends ContextTestSupport {
+
+    public void testPropertiesPropagatedOrNot() throws Exception {
+        template.sendBody("direct-vm:start.noprops", "Hello World");
+        template.sendBody("direct-vm:start.props", "Hello World");
+        template.sendBody("direct-vm:start.default", "Hello World");
+    }
+    
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // Starters.
+                from("direct-vm:start.noprops")
+                    .setProperty("abc", constant("def"))
+                    .to("direct-vm:foo.noprops?propagateProperties=false");
+                
+                from("direct-vm:start.props")
+                    .setProperty("abc", constant("def"))
+                    .to("direct-vm:foo.props?propagateProperties=true");
+
+                from("direct-vm:start.default")
+                    .setProperty("abc", constant("def"))
+                    .to("direct-vm:foo.props");
+                
+                // Asserters.
+                from("direct-vm:foo.noprops").process(exchange -> 
+                    assertNull(exchange.getProperty("abc"))
+                );
+                
+                from("direct-vm:foo.props").process(exchange -> 
+                    assertEquals("def", exchange.getProperty("abc", String.class))
+                );
+            }
+        };
+    }
+
+}
\ No newline at end of file