You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/06/05 12:53:31 UTC

svn commit: r1132370 - in /camel/trunk/components/camel-hazelcast/src: main/java/org/apache/camel/component/hazelcast/seda/ test/java/org/apache/camel/component/hazelcast/

Author: davsclaus
Date: Sun Jun  5 10:53:31 2011
New Revision: 1132370

URL: http://svn.apache.org/viewvc?rev=1132370&view=rev
Log:
CAMEL-3983: Added transferExchange option to hazelcast seda. Thanks to Claus and Ioannis for the work on this.

Added:
    camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaTransferExchangeTest.java
Modified:
    camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java
    camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
    camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaProducer.java
    camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaConfigurationTest.java

Modified: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java?rev=1132370&r1=1132369&r2=1132370&view=diff
==============================================================================
--- camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java (original)
+++ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java Sun Jun  5 10:53:31 2011
@@ -24,6 +24,7 @@ public class HazelcastSedaConfiguration 
     private int concurrentConsumers = 1;
     private int pollInterval = 1000;
     private String queueName;
+    private boolean transferExchange;
 
     public HazelcastSedaConfiguration() {
         super();
@@ -52,4 +53,13 @@ public class HazelcastSedaConfiguration 
     public void setPollInterval(int pollInterval) {
         this.pollInterval = pollInterval;
     }
+
+    public boolean isTransferExchange() {
+        return transferExchange;
+    }
+
+    public void setTransferExchange(boolean transferExchange) {
+        this.transferExchange = transferExchange;
+    }
+
 }

Modified: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java?rev=1132370&r1=1132369&r2=1132370&view=diff
==============================================================================
--- camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java (original)
+++ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java Sun Jun  5 10:53:31 2011
@@ -28,6 +28,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,10 +79,16 @@ public class HazelcastSedaConsumer exten
                 final Object body = queue.poll(endpoint.getConfiguration().getPollInterval(), TimeUnit.MILLISECONDS);
 
                 if (body != null) {
-                    exchange.getIn().setBody(body);
+                    if (body instanceof DefaultExchangeHolder) {
+                        DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) body);
+                    } else {
+                        exchange.getIn().setBody(body);
+                    }
                     try {
+                        // process using the asynchronous routing engine
                         processor.process(exchange, new AsyncCallback() {
-                            public void done(final boolean sync) {
+                            public void done(boolean asyncDone) {
+                                // noop
                             }
                         });
 

Modified: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaProducer.java?rev=1132370&r1=1132369&r2=1132370&view=diff
==============================================================================
--- camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaProducer.java (original)
+++ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaProducer.java Sun Jun  5 10:53:31 2011
@@ -20,15 +20,15 @@ import java.io.Serializable;
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.impl.DefaultExchangeHolder;
 
 /**
  * Implementation of Hazelcast SEDA {@link Producer} component. Just appends exchange body into a Hazelcast {@link BlockingQueue}.
  */
-public class HazelcastSedaProducer extends DefaultProducer implements AsyncProcessor {
+public class HazelcastSedaProducer extends DefaultAsyncProducer {
 
     private final transient BlockingQueue queue;
 
@@ -37,10 +37,6 @@ public class HazelcastSedaProducer exten
         this.queue = hzlq;
     }
 
-    public void process(final Exchange exchange) throws Exception {
-        checkAndStore(exchange);
-    }
-
     public boolean process(final Exchange exchange, final AsyncCallback callback) {
         checkAndStore(exchange);
         callback.done(true);
@@ -52,11 +48,18 @@ public class HazelcastSedaProducer exten
         Object obj;
         Object body = exchange.getIn().getBody();
 
-        // in case body is not serializable convert to byte array
-        if (!(body instanceof Serializable)) {
-            obj = exchange.getIn().getBody(byte[].class);
+        final HazelcastSedaEndpoint endpoint = (HazelcastSedaEndpoint) this.getEndpoint();
+        final HazelcastSedaConfiguration configuration = endpoint.getConfiguration();
+
+        if (configuration.isTransferExchange()) {
+            obj = DefaultExchangeHolder.marshal(exchange);
         } else {
-            obj = body;
+            // in case body is not serializable convert to byte array
+            if (!(body instanceof Serializable)) {
+                obj = exchange.getIn().getBody(byte[].class);
+            } else {
+                obj = body;
+            }
         }
 
         queue.add(obj);

Modified: camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaConfigurationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaConfigurationTest.java?rev=1132370&r1=1132369&r2=1132370&view=diff
==============================================================================
--- camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaConfigurationTest.java (original)
+++ camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaConfigurationTest.java Sun Jun  5 10:53:31 2011
@@ -23,6 +23,21 @@ import org.junit.Test;
 public class HazelcastSedaConfigurationTest extends CamelTestSupport {
 
     @Test
+    public void createEndpointWithTransferExchange() throws Exception {
+        HazelcastComponent hzlqComponent = new HazelcastComponent(context);
+
+        HazelcastSedaEndpoint hzlqEndpoint = (HazelcastSedaEndpoint) hzlqComponent.createEndpoint("hazelcast:seda:foo?transferExchange=true");
+
+        assertEquals("Invalid queue name", "foo", hzlqEndpoint.getConfiguration().getQueueName());
+        assertTrue("Default value of concurrent consumers is invalid", hzlqEndpoint.getConfiguration().isTransferExchange());
+
+        hzlqEndpoint = (HazelcastSedaEndpoint) hzlqComponent.createEndpoint("hazelcast:seda:foo?transferExchange=false");
+
+        assertEquals("Invalid queue name", "foo", hzlqEndpoint.getConfiguration().getQueueName());
+        assertFalse("Default value of concurrent consumers is invalid", hzlqEndpoint.getConfiguration().isTransferExchange());
+    }
+
+    @Test
     public void createEndpointWithNoParams() throws Exception {
         HazelcastComponent hzlqComponent = new HazelcastComponent(context);
 
@@ -52,7 +67,6 @@ public class HazelcastSedaConfigurationT
         assertEquals("Invalid queue name", "foo", hzlqEndpoint.getConfiguration().getQueueName());
         assertEquals("Default value of concurrent consumers is invalid", 1, hzlqEndpoint.getConfiguration().getConcurrentConsumers());
         assertEquals("Invalid pool interval", 4000, hzlqEndpoint.getConfiguration().getPollInterval());
-
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -60,4 +74,5 @@ public class HazelcastSedaConfigurationT
         HazelcastComponent hzlqComponent = new HazelcastComponent(context);
         hzlqComponent.createEndpoint("hazelcast:seda: ?concurrentConsumers=4");
     }
+
 }

Added: camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaTransferExchangeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaTransferExchangeTest.java?rev=1132370&view=auto
==============================================================================
--- camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaTransferExchangeTest.java (added)
+++ camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaTransferExchangeTest.java Sun Jun  5 10:53:31 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class HazelcastSedaTransferExchangeTest extends CamelTestSupport {
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint mock;
+
+    @Test
+    public void testExchangeTransferEnabled() throws InterruptedException {
+        final String value = "CAMEL-3983";
+
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived("test");
+        mock.expectedHeaderReceived("test", value);
+
+        Exchange exchange = createExchangeWithBody("test");
+        exchange.getIn().setHeader("test", value);
+
+        template.send("direct:foobar", exchange);
+
+        assertMockEndpointsSatisfied();
+        mock.reset();
+    }
+
+    @Test(expected = AssertionError.class)
+    public void testExchangeTransferDisabled() throws InterruptedException {
+        mock.expectedMessageCount(1);
+        mock.expectedBodiesReceived("test");
+        mock.expectedHeaderReceived("test", "");
+
+        Exchange exchange = createExchangeWithBody("test");
+        exchange.getIn().setHeader("test", "fail...");
+
+        template.send("direct:foo", exchange);
+
+        assertMockEndpointsSatisfied();
+        mock.reset();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:foo").to("hazelcast:seda:foo");
+
+                from("direct:foobar").to("hazelcast:seda:foo?transferExchange=true");
+
+                from("hazelcast:seda:foo").to("mock:result");
+            }
+        };
+    }
+
+}