You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/06/05 12:53:31 UTC
svn commit: r1132370 - in /camel/trunk/components/camel-hazelcast/src:
main/java/org/apache/camel/component/hazelcast/seda/
test/java/org/apache/camel/component/hazelcast/
Author: davsclaus
Date: Sun Jun 5 10:53:31 2011
New Revision: 1132370
URL: http://svn.apache.org/viewvc?rev=1132370&view=rev
Log:
CAMEL-3983: Added transferExchange option to hazelcast seda. Thanks to Claus and Ioannis for the work on this.
Added:
camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaTransferExchangeTest.java
Modified:
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaProducer.java
camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaConfigurationTest.java
Modified: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java?rev=1132370&r1=1132369&r2=1132370&view=diff
==============================================================================
--- camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java (original)
+++ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConfiguration.java Sun Jun 5 10:53:31 2011
@@ -24,6 +24,7 @@ public class HazelcastSedaConfiguration
private int concurrentConsumers = 1;
private int pollInterval = 1000;
private String queueName;
+ private boolean transferExchange;
public HazelcastSedaConfiguration() {
super();
@@ -52,4 +53,13 @@ public class HazelcastSedaConfiguration
public void setPollInterval(int pollInterval) {
this.pollInterval = pollInterval;
}
+
+ public boolean isTransferExchange() {
+ return transferExchange;
+ }
+
+ public void setTransferExchange(boolean transferExchange) {
+ this.transferExchange = transferExchange;
+ }
+
}
Modified: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java?rev=1132370&r1=1132369&r2=1132370&view=diff
==============================================================================
--- camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java (original)
+++ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java Sun Jun 5 10:53:31 2011
@@ -28,6 +28,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,10 +79,16 @@ public class HazelcastSedaConsumer exten
final Object body = queue.poll(endpoint.getConfiguration().getPollInterval(), TimeUnit.MILLISECONDS);
if (body != null) {
- exchange.getIn().setBody(body);
+ if (body instanceof DefaultExchangeHolder) {
+ DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) body);
+ } else {
+ exchange.getIn().setBody(body);
+ }
try {
+ // process using the asynchronous routing engine
processor.process(exchange, new AsyncCallback() {
- public void done(final boolean sync) {
+ public void done(boolean asyncDone) {
+ // noop
}
});
Modified: camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaProducer.java?rev=1132370&r1=1132369&r2=1132370&view=diff
==============================================================================
--- camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaProducer.java (original)
+++ camel/trunk/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaProducer.java Sun Jun 5 10:53:31 2011
@@ -20,15 +20,15 @@ import java.io.Serializable;
import java.util.concurrent.BlockingQueue;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
+import org.apache.camel.impl.DefaultExchangeHolder;
/**
* Implementation of Hazelcast SEDA {@link Producer} component. Just appends exchange body into a Hazelcast {@link BlockingQueue}.
*/
-public class HazelcastSedaProducer extends DefaultProducer implements AsyncProcessor {
+public class HazelcastSedaProducer extends DefaultAsyncProducer {
private final transient BlockingQueue queue;
@@ -37,10 +37,6 @@ public class HazelcastSedaProducer exten
this.queue = hzlq;
}
- public void process(final Exchange exchange) throws Exception {
- checkAndStore(exchange);
- }
-
public boolean process(final Exchange exchange, final AsyncCallback callback) {
checkAndStore(exchange);
callback.done(true);
@@ -52,11 +48,18 @@ public class HazelcastSedaProducer exten
Object obj;
Object body = exchange.getIn().getBody();
- // in case body is not serializable convert to byte array
- if (!(body instanceof Serializable)) {
- obj = exchange.getIn().getBody(byte[].class);
+ final HazelcastSedaEndpoint endpoint = (HazelcastSedaEndpoint) this.getEndpoint();
+ final HazelcastSedaConfiguration configuration = endpoint.getConfiguration();
+
+ if (configuration.isTransferExchange()) {
+ obj = DefaultExchangeHolder.marshal(exchange);
} else {
- obj = body;
+ // in case body is not serializable convert to byte array
+ if (!(body instanceof Serializable)) {
+ obj = exchange.getIn().getBody(byte[].class);
+ } else {
+ obj = body;
+ }
}
queue.add(obj);
Modified: camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaConfigurationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaConfigurationTest.java?rev=1132370&r1=1132369&r2=1132370&view=diff
==============================================================================
--- camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaConfigurationTest.java (original)
+++ camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaConfigurationTest.java Sun Jun 5 10:53:31 2011
@@ -23,6 +23,21 @@ import org.junit.Test;
public class HazelcastSedaConfigurationTest extends CamelTestSupport {
@Test
+ public void createEndpointWithTransferExchange() throws Exception {
+ HazelcastComponent hzlqComponent = new HazelcastComponent(context);
+
+ HazelcastSedaEndpoint hzlqEndpoint = (HazelcastSedaEndpoint) hzlqComponent.createEndpoint("hazelcast:seda:foo?transferExchange=true");
+
+ assertEquals("Invalid queue name", "foo", hzlqEndpoint.getConfiguration().getQueueName());
+ assertTrue("Default value of concurrent consumers is invalid", hzlqEndpoint.getConfiguration().isTransferExchange());
+
+ hzlqEndpoint = (HazelcastSedaEndpoint) hzlqComponent.createEndpoint("hazelcast:seda:foo?transferExchange=false");
+
+ assertEquals("Invalid queue name", "foo", hzlqEndpoint.getConfiguration().getQueueName());
+ assertFalse("Default value of concurrent consumers is invalid", hzlqEndpoint.getConfiguration().isTransferExchange());
+ }
+
+ @Test
public void createEndpointWithNoParams() throws Exception {
HazelcastComponent hzlqComponent = new HazelcastComponent(context);
@@ -52,7 +67,6 @@ public class HazelcastSedaConfigurationT
assertEquals("Invalid queue name", "foo", hzlqEndpoint.getConfiguration().getQueueName());
assertEquals("Default value of concurrent consumers is invalid", 1, hzlqEndpoint.getConfiguration().getConcurrentConsumers());
assertEquals("Invalid pool interval", 4000, hzlqEndpoint.getConfiguration().getPollInterval());
-
}
@Test(expected = IllegalArgumentException.class)
@@ -60,4 +74,5 @@ public class HazelcastSedaConfigurationT
HazelcastComponent hzlqComponent = new HazelcastComponent(context);
hzlqComponent.createEndpoint("hazelcast:seda: ?concurrentConsumers=4");
}
+
}
Added: camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaTransferExchangeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaTransferExchangeTest.java?rev=1132370&view=auto
==============================================================================
--- camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaTransferExchangeTest.java (added)
+++ camel/trunk/components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/HazelcastSedaTransferExchangeTest.java Sun Jun 5 10:53:31 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hazelcast;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+public class HazelcastSedaTransferExchangeTest extends CamelTestSupport {
+
+ @EndpointInject(uri = "mock:result")
+ private MockEndpoint mock;
+
+ @Test
+ public void testExchangeTransferEnabled() throws InterruptedException {
+ final String value = "CAMEL-3983";
+
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived("test");
+ mock.expectedHeaderReceived("test", value);
+
+ Exchange exchange = createExchangeWithBody("test");
+ exchange.getIn().setHeader("test", value);
+
+ template.send("direct:foobar", exchange);
+
+ assertMockEndpointsSatisfied();
+ mock.reset();
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testExchangeTransferDisabled() throws InterruptedException {
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived("test");
+ mock.expectedHeaderReceived("test", "");
+
+ Exchange exchange = createExchangeWithBody("test");
+ exchange.getIn().setHeader("test", "fail...");
+
+ template.send("direct:foo", exchange);
+
+ assertMockEndpointsSatisfied();
+ mock.reset();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:foo").to("hazelcast:seda:foo");
+
+ from("direct:foobar").to("hazelcast:seda:foo?transferExchange=true");
+
+ from("hazelcast:seda:foo").to("mock:result");
+ }
+ };
+ }
+
+}