You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/05/06 16:38:11 UTC
svn commit: r941756 - in /camel/trunk/components:
camel-mina/src/main/java/org/apache/camel/component/mina/
camel-mina/src/test/java/org/apache/camel/component/mina/
camel-netty/src/main/java/org/apache/camel/component/netty/
camel-netty/src/main/java/...
Author: davsclaus
Date: Thu May 6 14:38:10 2010
New Revision: 941756
URL: http://svn.apache.org/viewvc?rev=941756&view=rev
Log:
Added transferExchange option to camel-netty.
Added:
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java (with props)
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java (with props)
camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java (with props)
Modified:
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java
camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaPayloadHelper.java Thu May 6 14:38:10 2010
@@ -20,7 +20,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.impl.DefaultExchangeHolder;
/**
- * Helper to get and set the correct payload when transfering data using camel-mina.
+ * Helper to get and set the correct payload when transferring data using camel-mina.
* Always use this helper instead of direct access on the exchange object.
* <p/>
* This helper ensures that we can also transfer exchange objects over the wire using the
Modified: camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java (original)
+++ camel/trunk/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaProducer.java Thu May 6 14:38:10 2010
@@ -243,7 +243,7 @@ public class MinaProducer extends Defaul
@Override
public void sessionClosed(IoSession session) throws Exception {
- if (sync && message == null) {
+ if (sync && !messageReceived) {
// sync=true (InOut mode) so we expected a message as reply but did not get one before the session is closed
if (LOG.isDebugEnabled()) {
LOG.debug("Session closed but no message received from address: " + this.endpoint.getAddress());
Modified: camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java (original)
+++ camel/trunk/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java Thu May 6 14:38:10 2010
@@ -38,7 +38,7 @@ public class MinaTransferExchangeOptionT
protected String uri = "mina:tcp://localhost:6321?sync=true&encoding=UTF-8&transferExchange=true";
- public void testMianTransferExchangeOptionWithoutException() throws Exception {
+ public void testMinaTransferExchangeOptionWithoutException() throws Exception {
Exchange exchange = sendExchange(false);
assertExchange(exchange, false);
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConfiguration.java Thu May 6 14:38:10 2010
@@ -59,6 +59,7 @@ public class NettyConfiguration {
private String securityProvider;
private boolean disconnect;
private boolean lazyChannelCreation = true;
+ private boolean transferExchange;
public NettyConfiguration() {
setKeepAlive(true);
@@ -148,6 +149,9 @@ public class NettyConfiguration {
if (settings.containsKey("lazyChannelCreation")) {
setLazyChannelCreation(Boolean.valueOf((String) settings.get("lazyChannelCreation")));
}
+ if (settings.containsKey("transferExchange")) {
+ setTransferExchange(Boolean.valueOf((String) settings.get("transferExchange")));
+ }
}
public String getProtocol() {
@@ -378,6 +382,14 @@ public class NettyConfiguration {
this.lazyChannelCreation = lazyChannelCreation;
}
+ public boolean isTransferExchange() {
+ return transferExchange;
+ }
+
+ public void setTransferExchange(boolean transferExchange) {
+ this.transferExchange = transferExchange;
+ }
+
public String getAddress() {
return host + ":" + port;
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyConstants.java Thu May 6 14:38:10 2010
@@ -26,6 +26,7 @@ public final class NettyConstants {
public static final String NETTY_CLOSE_CHANNEL_WHEN_COMPLETE = "CamelNettyCloseChannelWhenComplete";
public static final String NETTY_CHANNEL_HANDLER_CONTEXT = "CamelNettyChannelHandlerContext";
public static final String NETTY_MESSAGE_EVENT = "CamelNettyMessageEvent";
+ public static final String NETTY_REMOTE_ADDRESS = "CamelNettyRemoteAddress";
private NettyConstants() {
// Utility class
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyEndpoint.java Thu May 6 14:38:10 2010
@@ -45,7 +45,9 @@ public class NettyEndpoint extends Defau
Exchange exchange = createExchange();
exchange.getIn().setHeader(NettyConstants.NETTY_CHANNEL_HANDLER_CONTEXT, ctx);
exchange.getIn().setHeader(NettyConstants.NETTY_MESSAGE_EVENT, messageEvent);
- return exchange;
+ exchange.getIn().setHeader(NettyConstants.NETTY_REMOTE_ADDRESS, messageEvent.getRemoteAddress());
+ NettyPayloadHelper.setIn(exchange, messageEvent.getMessage());
+ return exchange;
}
public boolean isSingleton() {
Added: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java?rev=941756&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java (added)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java Thu May 6 14:38:10 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultExchangeHolder;
+
+/**
+ * Helper to get and set the correct payload when transferring data using camel-netty.
+ * Always use this helper instead of direct access on the exchange object.
+ * <p/>
+ * This helper ensures that we can also transfer exchange objects over the wire using the
+ * <tt>transferExchange=true</tt> option.
+ *
+ * @version $Revision$
+ */
+public final class NettyPayloadHelper {
+
+ public static Object getIn(NettyEndpoint endpoint, Exchange exchange) {
+ if (endpoint.getConfiguration().isTransferExchange()) {
+ // we should transfer the entire exchange over the wire (includes in/out)
+ return DefaultExchangeHolder.marshal(exchange);
+ } else {
+ // normal transfer using the body only
+ return exchange.getIn().getBody();
+ }
+ }
+
+ public static Object getOut(NettyEndpoint endpoint, Exchange exchange) {
+ if (endpoint.getConfiguration().isTransferExchange()) {
+ // we should transfer the entire exchange over the wire (includes in/out)
+ return DefaultExchangeHolder.marshal(exchange);
+ } else {
+ // normal transfer using the body only
+ return exchange.getOut().getBody();
+ }
+ }
+
+ public static void setIn(Exchange exchange, Object payload) {
+ if (payload instanceof DefaultExchangeHolder) {
+ DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload);
+ } else {
+ // normal transfer using the body only
+ exchange.getIn().setBody(payload);
+ }
+ }
+
+ public static void setOut(Exchange exchange, Object payload) {
+ if (payload instanceof DefaultExchangeHolder) {
+ DefaultExchangeHolder.unmarshal(exchange, (DefaultExchangeHolder) payload);
+ } else {
+ // normal transfer using the body only and preserve the headers
+ exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+ exchange.getOut().setBody(payload);
+ }
+ }
+
+}
Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyPayloadHelper.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java Thu May 6 14:38:10 2010
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelException;
+import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.ServicePoolAware;
@@ -108,20 +109,50 @@ public class NettyProducer extends Defau
openConnection();
}
+ Object body = NettyPayloadHelper.getIn(getEndpoint(), exchange);
+ if (body == null) {
+ LOG.warn("No payload to send for exchange: " + exchange);
+ return; // exit early since nothing to write
+ }
+
if (configuration.isSync()) {
+ // only initialize latch if we should get a response
countdownLatch = new CountDownLatch(1);
}
+ // log what we are writing
+ if (LOG.isDebugEnabled()) {
+ Object out = body;
+ if (body instanceof byte[]) {
+ // byte arrays is not readable so convert to string
+ out = exchange.getContext().getTypeConverter().convertTo(String.class, body);
+ }
+ LOG.debug("Writing body : " + out);
+ }
+
// write the body
- NettyHelper.writeBody(channel, null, exchange.getIn().getBody(), exchange);
+ NettyHelper.writeBody(channel, null, body, exchange);
if (configuration.isSync()) {
boolean success = countdownLatch.await(configuration.getReceiveTimeoutMillis(), TimeUnit.MILLISECONDS);
if (!success) {
throw new ExchangeTimedOutException(exchange, configuration.getReceiveTimeoutMillis());
}
- Object response = ((ClientChannelHandler) clientPipeline.get("handler")).getResponse();
- exchange.getOut().setBody(response);
+
+ ClientChannelHandler handler = (ClientChannelHandler) clientPipeline.get("handler");
+ if (handler.getCause() != null) {
+ throw new CamelExchangeException("Error occurred in ClientChannelHandler", exchange, handler.getCause());
+ } else if (!handler.isMessageReceived()) {
+ // no message received
+ throw new CamelExchangeException("No response received from remote server: " + configuration.getAddress(), exchange);
+ } else {
+ // set the result on either IN or OUT on the original exchange depending on its pattern
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ NettyPayloadHelper.setOut(exchange, handler.getMessage());
+ } else {
+ NettyPayloadHelper.setIn(exchange, handler.getMessage());
+ }
+ }
}
// should channel be closed after complete?
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java Thu May 6 14:38:10 2010
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.netty.handlers;
+import java.util.concurrent.CountDownLatch;
+
import org.apache.camel.CamelException;
import org.apache.camel.component.netty.NettyHelper;
import org.apache.camel.component.netty.NettyProducer;
@@ -32,13 +34,21 @@ import org.jboss.netty.channel.SimpleCha
public class ClientChannelHandler extends SimpleChannelUpstreamHandler {
private static final transient Log LOG = LogFactory.getLog(ClientChannelHandler.class);
private NettyProducer producer;
- private Object response;
-
+ private Object message;
+ private Throwable cause;
+ private boolean messageReceived;
+
public ClientChannelHandler(NettyProducer producer) {
super();
this.producer = producer;
}
+ public void reset() {
+ this.message = null;
+ this.cause = null;
+ this.messageReceived = false;
+ }
+
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent channelStateEvent) throws Exception {
// to keep track of open sockets
@@ -47,35 +57,59 @@ public class ClientChannelHandler extend
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent exceptionEvent) throws Exception {
+ this.message = null;
+ this.messageReceived = false;
+ this.cause = exceptionEvent.getCause();
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Closing channel as an exception was thrown from Netty", exceptionEvent.getCause());
+ LOG.debug("Closing channel as an exception was thrown from Netty", cause);
}
// close channel in case an exception was thrown
NettyHelper.close(exceptionEvent.getChannel());
+ }
- // must wrap and rethrow since cause can be of Throwable and we must only throw Exception
- throw new CamelException(exceptionEvent.getCause());
+ @Override
+ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ if (producer.getConfiguration().isSync() && !messageReceived) {
+ // sync=true (InOut mode) so we expected a message as reply but did not get one before the session is closed
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Channel closed but no message received from address: " + producer.getConfiguration().getAddress());
+ }
+ // session was closed but no message received. This could be because the remote server had an internal error
+ // and could not return a response. We should count down to stop waiting for a response
+ countDown();
+ }
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
- setResponse(messageEvent.getMessage());
+ message = messageEvent.getMessage();
+ messageReceived = true;
+ cause = null;
if (LOG.isDebugEnabled()) {
- LOG.debug("Incoming message:" + response);
+ LOG.debug("Message received: " + message);
}
+ // signal we have received message
+ countDown();
+ }
+
+ protected void countDown() {
if (producer.getConfiguration().isSync()) {
producer.getCountdownLatch().countDown();
- }
+ }
+ }
+
+ public Object getMessage() {
+ return message;
}
- public Object getResponse() {
- return response;
+ public boolean isMessageReceived() {
+ return messageReceived;
}
- public void setResponse(Object response) {
- this.response = response;
+ public Throwable getCause() {
+ return cause;
}
-
}
Modified: camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java?rev=941756&r1=941755&r2=941756&view=diff
==============================================================================
--- camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java (original)
+++ camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java Thu May 6 14:38:10 2010
@@ -22,6 +22,7 @@ import org.apache.camel.ExchangePattern;
import org.apache.camel.component.netty.NettyConstants;
import org.apache.camel.component.netty.NettyConsumer;
import org.apache.camel.component.netty.NettyHelper;
+import org.apache.camel.component.netty.NettyPayloadHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -76,7 +77,6 @@ public class ServerChannelHandler extend
if (consumer.getConfiguration().isSync()) {
exchange.setPattern(ExchangePattern.InOut);
}
- exchange.getIn().setBody(in);
try {
consumer.getProcessor().process(exchange);
@@ -93,17 +93,18 @@ public class ServerChannelHandler extend
private void sendResponse(MessageEvent messageEvent, Exchange exchange) throws Exception {
Object body;
if (ExchangeHelper.isOutCapable(exchange)) {
- body = exchange.getOut().getBody();
+ body = NettyPayloadHelper.getOut(consumer.getEndpoint(), exchange);
} else {
- body = exchange.getIn().getBody();
+ body = NettyPayloadHelper.getIn(consumer.getEndpoint(), exchange);
}
- if (exchange.isFailed()) {
- if (exchange.getException() == null) {
- // fault detected
- body = exchange.getOut().getBody();
- } else {
+ boolean failed = exchange.isFailed();
+ if (failed && !consumer.getEndpoint().getConfiguration().isTransferExchange()) {
+ if (exchange.getException() != null) {
body = exchange.getException();
+ } else {
+ // failed and no exception, must be a fault
+ body = exchange.getOut().getBody();
}
}
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java?rev=941756&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java Thu May 6 14:38:10 2010
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyInOutWithForcedNoResponseTest extends CamelTestSupport {
+
+ @Test
+ public void testResponse() throws Exception {
+ Object out = template.requestBody("netty:tcp://localhost:4444", "Copenhagen");
+ assertEquals("Hello Claus", out);
+ }
+
+ @Test
+ public void testNoResponse() throws Exception {
+ try {
+ template.requestBody("netty:tcp://localhost:4444", "London");
+ fail("Should throw an exception");
+ } catch (RuntimeCamelException e) {
+ assertTrue(e.getCause().getMessage().startsWith("No response"));
+ }
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("netty:tcp://localhost:4444")
+ .choice()
+ .when(body().isEqualTo("Copenhagen")).transform(constant("Hello Claus"))
+ .otherwise().transform(constant(null));
+ }
+ };
+ }
+}
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyInOutWithForcedNoResponseTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java?rev=941756&view=auto
==============================================================================
--- camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java (added)
+++ camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java Thu May 6 14:38:10 2010
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.nio.charset.Charset;
+
+import junit.framework.Assert;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+/**
+ * @version $Revision$
+ */
+public class NettyTransferExchangeOptionTest extends CamelTestSupport {
+
+ protected String uri = "netty:tcp://localhost:6321?transferExchange=true";
+
+ @Test
+ public void testNettyTransferExchangeOptionWithoutException() throws Exception {
+ Exchange exchange = sendExchange(false);
+ assertExchange(exchange, false);
+ }
+
+ @Test
+ public void testNettyTransferExchangeOptionWithException() throws Exception {
+ Exchange exchange = sendExchange(true);
+ assertExchange(exchange, true);
+ }
+
+ private Exchange sendExchange(boolean setException) throws Exception {
+ Endpoint endpoint = context.getEndpoint(uri);
+ Exchange exchange = endpoint.createExchange();
+
+ Message message = exchange.getIn();
+ message.setBody("Hello!");
+ message.setHeader("cheese", "feta");
+ exchange.setProperty("ham", "old");
+ exchange.setProperty("setException", setException);
+
+ Producer producer = endpoint.createProducer();
+ producer.start();
+ producer.process(exchange);
+
+ return exchange;
+ }
+
+ private void assertExchange(Exchange exchange, boolean hasFault) {
+ if (!hasFault) {
+ Message out = exchange.getOut();
+ assertNotNull(out);
+ assertFalse(out.isFault());
+ assertEquals("Goodbye!", out.getBody());
+ assertEquals("cheddar", out.getHeader("cheese"));
+ } else {
+ Message fault = exchange.getOut();
+ assertNotNull(fault);
+ assertTrue(fault.isFault());
+ assertNotNull(fault.getBody());
+ assertTrue("Should get the InterrupteException exception", fault.getBody() instanceof InterruptedException);
+ assertEquals("nihao", fault.getHeader("hello"));
+ }
+
+
+ // in should stay the same
+ Message in = exchange.getIn();
+ assertNotNull(in);
+ assertEquals("Hello!", in.getBody());
+ assertEquals("feta", in.getHeader("cheese"));
+ // however the shared properties have changed
+ assertEquals("fresh", exchange.getProperty("salami"));
+ assertNull(exchange.getProperty("Charset"));
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from(uri).process(new Processor() {
+ public void process(Exchange e) throws InterruptedException {
+ Assert.assertNotNull(e.getIn().getBody());
+ Assert.assertNotNull(e.getIn().getHeaders());
+ Assert.assertNotNull(e.getProperties());
+ Assert.assertEquals("Hello!", e.getIn().getBody());
+ Assert.assertEquals("feta", e.getIn().getHeader("cheese"));
+ Assert.assertEquals("old", e.getProperty("ham"));
+ Assert.assertEquals(ExchangePattern.InOut, e.getPattern());
+ Boolean setException = (Boolean) e.getProperty("setException");
+
+ if (setException) {
+ e.getOut().setFault(true);
+ e.getOut().setBody(new InterruptedException());
+ e.getOut().setHeader("hello", "nihao");
+ } else {
+ e.getOut().setBody("Goodbye!");
+ e.getOut().setHeader("cheese", "cheddar");
+ }
+ e.setProperty("salami", "fresh");
+ e.setProperty("Charset", Charset.defaultCharset());
+ }
+ });
+ }
+ };
+ }
+}
+
+
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyTransferExchangeOptionTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date