You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by js...@apache.org on 2007/09/04 17:22:33 UTC

svn commit: r572712 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/util/ test/java/org/apache/camel/impl/

Author: jstrachan
Date: Tue Sep  4 08:22:33 2007
New Revision: 572712

URL: http://svn.apache.org/viewvc?rev=572712&view=rev
Log:
more helper methods for CAMEL-133 to make it easier to customize MEPs

Added:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java
      - copied, changed from r572587, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java?rev=572712&r1=572711&r2=572712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelTemplate.java Tue Sep  4 08:22:33 2007
@@ -72,6 +72,21 @@
     }
 
     /**
+     * Sends an exchange to an endpoint using a supplied
+     *
+     * @{link Processor} to populate the exchange
+     *
+     * @param endpointUri the endpoint URI to send the exchange to
+     * @param pattern the message {@link ExchangePattern} such as
+     *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
+     * @param processor the transformer used to populate the new exchange
+     */
+    public E send(String endpointUri, ExchangePattern pattern, Processor processor) {
+        Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
+        return send(endpoint, pattern, processor);
+    }
+
+    /**
      * Sends the exchange to the given endpoint
      * 
      * @param endpoint the endpoint to send the exchange to
@@ -96,6 +111,20 @@
     }
 
     /**
+     * Sends an exchange to an endpoint using a supplied
+     *
+     * @{link Processor} to populate the exchange
+     *
+     * @param endpoint the endpoint to send the exchange to
+     * @param pattern the message {@link ExchangePattern} such as
+     *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
+     * @param processor the transformer used to populate the new exchange
+     */
+    public E send(Endpoint<E> endpoint, ExchangePattern pattern, Processor processor) {
+        return producerCache.send(endpoint, pattern, processor);
+    }
+
+    /**
      * Send the body to an endpoint
      * 
      * @param endpoint
@@ -119,14 +148,9 @@
      * @param body = the payload
      * @return the result
      */
-    public Object sendBody(String endpointUri, final Object body) {
-        E result = send(endpointUri, new Processor() {
-            public void process(Exchange exchange) {
-                Message in = exchange.getIn();
-                in.setBody(body);
-            }
-        });
-        return extractResultBody(result);
+    public Object sendBody(String endpointUri, Object body) {
+        Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
+        return sendBody(endpoint, body);
     }
 
     /**

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java?rev=572712&r1=572711&r2=572712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Producer.java Tue Sep  4 08:22:33 2007
@@ -34,6 +34,13 @@
     E createExchange();
 
     /**
+     * Creates a new exchange of the given pattern to send to this endpoint
+     *
+     * @return a newly created exchange
+     */
+    E createExchange(ExchangePattern pattern);
+
+    /**
      * Creates a new exchange for communicating with this exchange using the
      * given exchange to pre-populate the values of the headers and messages
      */

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=572712&r1=572711&r2=572712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Tue Sep  4 08:22:33 2007
@@ -22,26 +22,24 @@
  * @version $Revision: $
  */
 public interface ProducerTemplate<E extends Exchange> extends Service {
-
     /**
      * Sends the exchange to the default endpoint
-     * 
+     *
      * @param exchange the exchange to send
      */
     E send(E exchange);
 
     /**
      * Sends an exchange to the default endpoint using a supplied
-     * 
-     * @{link Processor} to populate the exchange
-     * 
+     *
      * @param processor the transformer used to populate the new exchange
+     * @{link Processor} to populate the exchange
      */
     E send(Processor processor);
 
     /**
      * Sends the body to the default endpoint and returns the result content
-     * 
+     *
      * @param body the body to send
      * @return the returned message body
      */
@@ -50,9 +48,9 @@
     /**
      * Sends the body to the default endpoint with a specified header and header
      * value
-     * 
-     * @param body the payload send
-     * @param header the header name
+     *
+     * @param body        the payload send
+     * @param header      the header name
      * @param headerValue the header value
      * @return the result
      */
@@ -61,7 +59,7 @@
     /**
      * Sends the body to the default endpoint with the specified headers and
      * header values
-     * 
+     *
      * @param body the payload send
      * @return the result
      */
@@ -72,25 +70,35 @@
 
     /**
      * Sends the exchange to the given endpoint
-     * 
+     *
      * @param endpointUri the endpoint URI to send the exchange to
-     * @param exchange the exchange to send
+     * @param exchange    the exchange to send
      */
     E send(String endpointUri, E exchange);
 
     /**
      * Sends an exchange to an endpoint using a supplied
-     * 
-     * @{link Processor} to populate the exchange
-     * 
+     *
      * @param endpointUri the endpoint URI to send the exchange to
-     * @param processor the transformer used to populate the new exchange
+     * @param processor   the transformer used to populate the new exchange
+     * @{link Processor} to populate the exchange
      */
     E send(String endpointUri, Processor processor);
 
     /**
+     * Sends an exchange to an endpoint using a supplied
+     *
+     * @param endpointUri the endpoint URI to send the exchange to
+     * @param pattern     the message {@link ExchangePattern} such as
+     *                    {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
+     * @param processor   the transformer used to populate the new exchange
+     * @{link Processor} to populate the exchange
+     */
+    E send(String endpointUri, ExchangePattern pattern, Processor processor);
+
+    /**
      * Sends the exchange to the given endpoint
-     * 
+     *
      * @param endpoint the endpoint to send the exchange to
      * @param exchange the exchange to send
      */
@@ -98,30 +106,39 @@
 
     /**
      * Sends an exchange to an endpoint using a supplied
-     * 
-     * @{link Processor} to populate the exchange
-     * 
-     * @param endpoint the endpoint to send the exchange to
+     *
+     * @param endpoint  the endpoint to send the exchange to
      * @param processor the transformer used to populate the new exchange
+     * @{link Processor} to populate the exchange
      */
     E send(Endpoint<E> endpoint, Processor processor);
 
     /**
+     * Sends an exchange to an endpoint using a supplied
+     *
+     * @param endpoint  the endpoint to send the exchange to
+     * @param pattern   the message {@link ExchangePattern} such as
+     *                  {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
+     * @param processor the transformer used to populate the new exchange
+     * @{link Processor} to populate the exchange
+     */
+    E send(Endpoint<E> endpoint, ExchangePattern pattern, Processor processor);
+
+    /**
      * Send the body to an endpoint
-     * 
+     *
      * @param endpoint
-     * @param body = the payload
+     * @param body     = the payload
      * @return the result
      */
     Object sendBody(Endpoint<E> endpoint, Object body);
 
     /**
      * Send the body to an endpoint
-     * 
+     *
      * @param endpointUri
-     * @param body = the payload
+     * @param body        = the payload
      * @return the result
      */
     Object sendBody(String endpointUri, Object body);
-
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?rev=572712&r1=572711&r2=572712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Tue Sep  4 08:22:33 2007
@@ -155,7 +155,7 @@
     }
 
     public E createExchange(ExchangePattern pattern) {
-        return (E) new DefaultExchange(getContext(), getDefaultPattern());
+        return (E) new DefaultExchange(getContext(), pattern);
     }
 
     public ExchangePattern getDefaultPattern() {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java?rev=572712&r1=572711&r2=572712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducer.java Tue Sep  4 08:22:33 2007
@@ -19,6 +19,7 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Producer;
+import org.apache.camel.ExchangePattern;
 
 /**
  * A default implementation of @{link Producer} for implementation inheritence
@@ -38,6 +39,10 @@
 
     public E createExchange() {
         return endpoint.createExchange();
+    }
+
+    public E createExchange(ExchangePattern pattern) {
+        return endpoint.createExchange(pattern);
     }
 
     public E createExchange(E exchange) {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java?rev=572712&r1=572711&r2=572712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ProducerCache.java Tue Sep  4 08:22:33 2007
@@ -25,6 +25,7 @@
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -69,7 +70,6 @@
 
     /**
      * Sends an exchange to an endpoint using a supplied
-     * 
      * @{link Processor} to populate the exchange
      * 
      * @param endpoint the endpoint to send the exchange to
@@ -79,19 +79,42 @@
         try {
             Producer<E> producer = getProducer(endpoint);
             E exchange = producer.createExchange();
+            return sendExchange(endpoint, producer, processor, exchange);
+        } catch (Exception e) {
+            throw new RuntimeCamelException(e);
+        }
+    }
 
-            // lets populate using the processor callback
-            processor.process(exchange);
-
-            // now lets dispatch
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(">>>> " + endpoint + " " + exchange);
-            }
-            producer.process(exchange);
-            return exchange;
+    /**
+     * Sends an exchange to an endpoint using a supplied
+     * @{link Processor} to populate the exchange
+     *
+     * @param endpoint the endpoint to send the exchange to
+     * @param pattern the message {@link ExchangePattern} such as
+     *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
+     * @param processor the transformer used to populate the new exchange
+     */
+    public E send(Endpoint<E> endpoint, ExchangePattern pattern, Processor processor) {
+         try {
+            Producer<E> producer = getProducer(endpoint);
+            E exchange = producer.createExchange(pattern);
+            return sendExchange(endpoint, producer, processor, exchange);
         } catch (Exception e) {
             throw new RuntimeCamelException(e);
         }
+    }
+
+
+    protected E sendExchange(Endpoint<E> endpoint, Producer<E> producer, Processor processor, E exchange) throws Exception {
+        // lets populate using the processor callback
+        processor.process(exchange);
+
+        // now lets dispatch
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(">>>> " + endpoint + " " + exchange);
+        }
+        producer.process(exchange);
+        return exchange;
     }
 
     protected void doStop() throws Exception {

Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java (from r572587, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java&r1=572587&r2=572712&rev=572712&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CustomExchangePatternTest.java Tue Sep  4 08:22:33 2007
@@ -14,42 +14,68 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.processor;
+package org.apache.camel.impl;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
+import java.util.List;
+
 /**
  * @version $Revision: 1.1 $
  */
-public class FilterTest extends ContextTestSupport {
+public class CustomExchangePatternTest extends ContextTestSupport {
+    protected MockEndpoint resultEndpoint;
 
-    public void testSendMatchingMessage() throws Exception {
-        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
+    public void testInOut() throws Exception {
+        final ExchangePattern expectedPattern = ExchangePattern.InOut;
 
-        template.sendBodyAndHeader("direct:start", "<matched/>", "foo", "bar");
+        template.send("direct:start", expectedPattern, new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                assertEquals("MEP", expectedPattern, exchange.getPattern());
+                exchange.getIn().setBody("<hello>world!</hello>");
+            }
+        });
 
         resultEndpoint.assertIsSatisfied();
+        assertReceivedExpectedPattern(expectedPattern);
     }
 
-    public void testSendNotMatchingMessage() throws Exception {
-        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(0);
+    public void testInOnly() throws Exception {
+        ExchangePattern expectedPattern = ExchangePattern.InOnly;
 
-        template.sendBodyAndHeader("direct:start", "<notMatched/>", "foo", "notMatchedHeaderValue");
+        template.send("direct:start", expectedPattern, new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("<hello>world!</hello>");
+            }
+        });
 
         resultEndpoint.assertIsSatisfied();
+        assertReceivedExpectedPattern(expectedPattern);
     }
 
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(1);
+    }
+
+    protected void assertReceivedExpectedPattern(ExchangePattern expectedPattern) {
+        List<Exchange> list = resultEndpoint.getReceivedExchanges();
+        Exchange exchange = list.get(0);
+        assertEquals("MEP", expectedPattern, exchange.getPattern());
+    }
 
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("direct:start").filter(header("foo").isEqualTo("bar")).to("mock:result");
+                from("direct:start").to("mock:result");
             }
         };
     }
-
-}
+}
\ No newline at end of file